Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit e92b4ad

Browse filesBrowse files
committed
Add a retry mechanism for AMQP messages
Add some tests and remove the intrication between the receiver and the connection Configure each TTL individually and the dead routing key The `ttls` array is 0-indexed Update the retry based on feedback (`ttls` -> `ttl`, add options to documentation and normalises the default values) Catches failed retries and forward other messages' attributes
1 parent f96753b commit e92b4ad
Copy full SHA for e92b4ad

File tree

8 files changed

+342
-18
lines changed
Filter options

8 files changed

+342
-18
lines changed

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
<argument type="service" id="messenger.transport.encoder" />
6565
<argument type="service" id="messenger.transport.decoder" />
6666
<argument>%kernel.debug%</argument>
67+
<argument type="service" id="logger" on-invalid="null" />
6768

6869
<tag name="messenger.transport_factory" />
6970
</service>

‎src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php
+55-1Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public function testItNonAcknowledgeTheMessageIfAnExceptionHappened()
6969
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
7070
$connection->method('get')->willReturn($envelope);
7171

72-
$connection->expects($this->once())->method('nack')->with($envelope);
72+
$connection->expects($this->once())->method('nack')->with($envelope, AMQP_REQUEUE);
7373

7474
$receiver = new AmqpReceiver($serializer, $connection);
7575
$receiver->receive(function () {
@@ -101,6 +101,60 @@ public function testItRejectsTheMessageIfTheExceptionIsARejectMessageExceptionIn
101101
throw new WillNeverWorkException('Well...');
102102
});
103103
}
104+
105+
public function testItPublishesTheMessageForRetryIfSuchConfiguration()
106+
{
107+
$serializer = new Serializer(
108+
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
109+
);
110+
111+
$envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
112+
$envelope->method('getBody')->willReturn('{"message": "Hi"}');
113+
$envelope->method('getHeaders')->willReturn(array(
114+
'type' => DummyMessage::class,
115+
));
116+
117+
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
118+
$connection->method('get')->willReturn($envelope);
119+
$connection->method('getConnectionConfiguration')->willReturn(array('retry' => array('attempts' => 3)));
120+
$connection->method('publishForRetry')->with($envelope)->willReturn(true);
121+
122+
$connection->expects($this->once())->method('ack')->with($envelope);
123+
124+
$receiver = new AmqpReceiver($serializer, $connection);
125+
$receiver->receive(function (Envelope $envelope) use ($receiver) {
126+
$this->assertEquals(new DummyMessage('Hi'), $envelope->getMessage());
127+
$receiver->stop();
128+
});
129+
}
130+
131+
/**
132+
* @expectedException \Symfony\Component\Messenger\Tests\Transport\AmqpExt\InterruptException
133+
*/
134+
public function testItThrowsTheExceptionIfTheRetryPublishDidNotWork()
135+
{
136+
$serializer = new Serializer(
137+
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
138+
);
139+
140+
$envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
141+
$envelope->method('getBody')->willReturn('{"message": "Hi"}');
142+
$envelope->method('getHeaders')->willReturn(array(
143+
'type' => DummyMessage::class,
144+
));
145+
146+
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
147+
$connection->method('get')->willReturn($envelope);
148+
$connection->method('getConnectionConfiguration')->willReturn(array('retry' => array('attempts' => 3)));
149+
$connection->method('publishForRetry')->with($envelope)->willReturn(false);
150+
151+
$connection->expects($this->never())->method('ack')->with($envelope);
152+
153+
$receiver = new AmqpReceiver($serializer, $connection);
154+
$receiver->receive(function () {
155+
throw new InterruptException('Well...');
156+
});
157+
}
104158
}
105159

106160
class InterruptException extends \Exception

‎src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpTransportFactoryTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpTransportFactoryTest.php
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public function testItCreatesTheTransport()
4141
true
4242
);
4343

44-
$expectedTransport = new AmqpTransport($encoder, $decoder, Connection::fromDsn('amqp://localhost', array('foo' => 'bar'), true), array('foo' => 'bar'), true);
44+
$expectedTransport = new AmqpTransport($encoder, $decoder, Connection::fromDsn('amqp://localhost', array('foo' => 'bar'), true));
4545

4646
$this->assertEquals($expectedTransport, $factory->createTransport('amqp://localhost', array('foo' => 'bar')));
4747
}

‎src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php
+92Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,98 @@ public function testItCanDisableTheSetup()
189189
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key&auto-setup=false', array(), true, $factory);
190190
$connection->publish('body');
191191
}
192+
193+
public function testItRetriesTheMessage()
194+
{
195+
$amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock();
196+
$amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock();
197+
$retryQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock();
198+
199+
$factory = $this->getMockBuilder(AmqpFactory::class)->getMock();
200+
$factory->method('createConnection')->willReturn($amqpConnection);
201+
$factory->method('createChannel')->willReturn($amqpChannel);
202+
$factory->method('createQueue')->willReturn($retryQueue);
203+
$factory->method('createExchange')->will($this->onConsecutiveCalls(
204+
$retryExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
205+
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
206+
));
207+
208+
$amqpExchange->expects($this->once())->method('setName')->with('messages');
209+
$amqpExchange->method('getName')->willReturn('messages');
210+
211+
$retryExchange->expects($this->once())->method('setName')->with('retry');
212+
$retryExchange->expects($this->once())->method('declareExchange');
213+
$retryExchange->method('getName')->willReturn('retry');
214+
215+
$retryQueue->expects($this->once())->method('setName')->with('retry_queue_1');
216+
$retryQueue->expects($this->once())->method('setArguments')->with(array(
217+
'x-message-ttl' => 10000,
218+
'x-dead-letter-exchange' => 'messages',
219+
));
220+
221+
$retryQueue->expects($this->once())->method('declareQueue');
222+
$retryQueue->expects($this->once())->method('bind')->with('retry', 'attempt_1');
223+
224+
$envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
225+
$envelope->method('getHeader')->with('symfony-messenger-attempts')->willReturn(false);
226+
$envelope->method('getHeaders')->willReturn(array('x-some-headers' => 'foo'));
227+
$envelope->method('getBody')->willReturn('{}');
228+
229+
$retryExchange->expects($this->once())->method('publish')->with('{}', 'attempt_1', AMQP_NOPARAM, array('headers' => array('x-some-headers' => 'foo', 'symfony-messenger-attempts' => 1)));
230+
231+
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', array('retry' => array('attempts' => 3)), false, $factory);
232+
$connection->publishForRetry($envelope);
233+
}
234+
235+
public function testItRetriesTheMessageWithADifferentRoutingKeyAndTTLs()
236+
{
237+
$amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock();
238+
$amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock();
239+
$retryQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock();
240+
241+
$factory = $this->getMockBuilder(AmqpFactory::class)->getMock();
242+
$factory->method('createConnection')->willReturn($amqpConnection);
243+
$factory->method('createChannel')->willReturn($amqpChannel);
244+
$factory->method('createQueue')->willReturn($retryQueue);
245+
$factory->method('createExchange')->will($this->onConsecutiveCalls(
246+
$retryExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
247+
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
248+
));
249+
250+
$amqpExchange->expects($this->once())->method('setName')->with('messages');
251+
$amqpExchange->method('getName')->willReturn('messages');
252+
253+
$retryExchange->expects($this->once())->method('setName')->with('retry');
254+
$retryExchange->expects($this->once())->method('declareExchange');
255+
$retryExchange->method('getName')->willReturn('retry');
256+
257+
$connectionOptions = array(
258+
'retry' => array(
259+
'attempts' => 3,
260+
'dead_routing_key' => 'my_dead_routing_key',
261+
'ttl' => array(30000, 60000, 120000),
262+
),
263+
);
264+
265+
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', $connectionOptions, false, $factory);
266+
267+
$messageRetriedTwice = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
268+
$messageRetriedTwice->method('getHeader')->with('symfony-messenger-attempts')->willReturn('2');
269+
$messageRetriedTwice->method('getHeaders')->willReturn(array('symfony-messenger-attempts' => '2'));
270+
$messageRetriedTwice->method('getBody')->willReturn('{}');
271+
272+
$retryQueue->expects($this->once())->method('setName')->with('retry_queue_3');
273+
$retryQueue->expects($this->once())->method('setArguments')->with(array(
274+
'x-message-ttl' => 120000,
275+
'x-dead-letter-exchange' => 'messages',
276+
));
277+
278+
$retryQueue->expects($this->once())->method('declareQueue');
279+
$retryQueue->expects($this->once())->method('bind')->with('retry', 'attempt_3');
280+
281+
$retryExchange->expects($this->once())->method('publish')->with('{}', 'attempt_3', AMQP_NOPARAM, array('headers' => array('symfony-messenger-attempts' => 3)));
282+
$connection->publishForRetry($messageRetriedTwice);
283+
}
192284
}
193285

194286
class TestAmqpFactory extends AmqpFactory

‎src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php
+29-4Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
namespace Symfony\Component\Messenger\Transport\AmqpExt;
1313

14+
use Psr\Log\LoggerInterface;
1415
use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RejectMessageExceptionInterface;
1516
use Symfony\Component\Messenger\Transport\ReceiverInterface;
1617
use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface;
@@ -22,14 +23,18 @@
2223
*/
2324
class AmqpReceiver implements ReceiverInterface
2425
{
26+
private const DEFAULT_LOOP_SLEEP_IN_MICRO_SECONDS = 200000;
27+
2528
private $decoder;
2629
private $connection;
30+
private $logger;
2731
private $shouldStop;
2832

29-
public function __construct(DecoderInterface $decoder, Connection $connection)
33+
public function __construct(DecoderInterface $decoder, Connection $connection, LoggerInterface $logger = null)
3034
{
3135
$this->decoder = $decoder;
3236
$this->connection = $connection;
37+
$this->logger = $logger;
3338
}
3439

3540
/**
@@ -39,10 +44,11 @@ public function receive(callable $handler): void
3944
{
4045
while (!$this->shouldStop) {
4146
$AMQPEnvelope = $this->connection->get();
47+
4248
if (null === $AMQPEnvelope) {
4349
$handler(null);
4450

45-
usleep($this->connection->getConnectionCredentials()['loop_sleep'] ?? 200000);
51+
usleep($this->connection->getConnectionConfiguration()['loop_sleep'] ?? self::DEFAULT_LOOP_SLEEP_IN_MICRO_SECONDS);
4652
if (\function_exists('pcntl_signal_dispatch')) {
4753
pcntl_signal_dispatch();
4854
}
@@ -62,9 +68,25 @@ public function receive(callable $handler): void
6268

6369
throw $e;
6470
} catch (\Throwable $e) {
65-
$this->connection->nack($AMQPEnvelope, AMQP_REQUEUE);
71+
try {
72+
$retried = $this->connection->publishForRetry($AMQPEnvelope);
73+
} catch (\Throwable $retryException) {
74+
$this->logger && $this->logger->warning(sprintf('Retrying message #%s failed. Requeuing it now.', $AMQPEnvelope->getMessageId()), array(
75+
'retryException' => $retryException,
76+
'exception' => $e,
77+
));
6678

67-
throw $e;
79+
$retried = false;
80+
}
81+
82+
if (!$retried) {
83+
$this->connection->nack($AMQPEnvelope, AMQP_REQUEUE);
84+
85+
throw $e;
86+
}
87+
88+
// Acknowledge current message as another one as been requeued.
89+
$this->connection->ack($AMQPEnvelope);
6890
} finally {
6991
if (\function_exists('pcntl_signal_dispatch')) {
7092
pcntl_signal_dispatch();
@@ -73,6 +95,9 @@ public function receive(callable $handler): void
7395
}
7496
}
7597

98+
/**
99+
* {@inheritdoc}
100+
*/
76101
public function stop(): void
77102
{
78103
$this->shouldStop = true;

‎src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php
+5-2Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
namespace Symfony\Component\Messenger\Transport\AmqpExt;
1313

14+
use Psr\Log\LoggerInterface;
1415
use Symfony\Component\Messenger\Envelope;
1516
use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface;
1617
use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
@@ -26,12 +27,14 @@ class AmqpTransport implements TransportInterface
2627
private $connection;
2728
private $receiver;
2829
private $sender;
30+
private $logger;
2931

30-
public function __construct(EncoderInterface $encoder, DecoderInterface $decoder, Connection $connection)
32+
public function __construct(EncoderInterface $encoder, DecoderInterface $decoder, Connection $connection, LoggerInterface $logger = null)
3133
{
3234
$this->encoder = $encoder;
3335
$this->decoder = $decoder;
3436
$this->connection = $connection;
37+
$this->logger = $logger;
3538
}
3639

3740
/**
@@ -60,7 +63,7 @@ public function send(Envelope $envelope): void
6063

6164
private function getReceiver()
6265
{
63-
return $this->receiver = new AmqpReceiver($this->decoder, $this->connection);
66+
return $this->receiver = new AmqpReceiver($this->decoder, $this->connection, $this->logger);
6467
}
6568

6669
private function getSender()

‎src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransportFactory.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransportFactory.php
+5-2Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
namespace Symfony\Component\Messenger\Transport\AmqpExt;
1313

14+
use Psr\Log\LoggerInterface;
1415
use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface;
1516
use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
1617
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
@@ -24,17 +25,19 @@ class AmqpTransportFactory implements TransportFactoryInterface
2425
private $encoder;
2526
private $decoder;
2627
private $debug;
28+
private $logger;
2729

28-
public function __construct(EncoderInterface $encoder, DecoderInterface $decoder, bool $debug)
30+
public function __construct(EncoderInterface $encoder, DecoderInterface $decoder, bool $debug, LoggerInterface $logger = null)
2931
{
3032
$this->encoder = $encoder;
3133
$this->decoder = $decoder;
3234
$this->debug = $debug;
35+
$this->logger = $logger;
3336
}
3437

3538
public function createTransport(string $dsn, array $options): TransportInterface
3639
{
37-
return new AmqpTransport($this->encoder, $this->decoder, Connection::fromDsn($dsn, $options, $this->debug));
40+
return new AmqpTransport($this->encoder, $this->decoder, Connection::fromDsn($dsn, $options, $this->debug), $this->logger);
3841
}
3942

4043
public function supports(string $dsn, array $options): bool

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.