Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit f123370

Browse filesBrowse files
jwagechalasr
authored andcommitted
[Messenger] [Amqp] Handle AMQPConnectionException when publishing a message.
1 parent 182e93e commit f123370
Copy full SHA for f123370

File tree

2 files changed

+106
-13
lines changed
Filter options

2 files changed

+106
-13
lines changed

‎src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php
+67Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -774,6 +774,73 @@ public function testItCanBeConstructedWithTLSOptionsAndNonTLSDsn()
774774
);
775775
}
776776

777+
public function testItCanRetryPublishWhenAMQPConnectionExceptionIsThrown()
778+
{
779+
$factory = new TestAmqpFactory(
780+
$amqpConnection = $this->createMock(\AMQPConnection::class),
781+
$amqpChannel = $this->createMock(\AMQPChannel::class),
782+
$amqpQueue = $this->createMock(\AMQPQueue::class),
783+
$amqpExchange = $this->createMock(\AMQPExchange::class)
784+
);
785+
786+
$amqpExchange->expects($this->exactly(2))
787+
->method('publish')
788+
->willReturnOnConsecutiveCalls(
789+
$this->throwException(new \AMQPConnectionException('a socket error occurred')),
790+
null
791+
);
792+
793+
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
794+
$connection->publish('body');
795+
}
796+
797+
public function testItCanRetryPublishWithDelayWhenAMQPConnectionExceptionIsThrown()
798+
{
799+
$factory = new TestAmqpFactory(
800+
$amqpConnection = $this->createMock(\AMQPConnection::class),
801+
$amqpChannel = $this->createMock(\AMQPChannel::class),
802+
$amqpQueue = $this->createMock(\AMQPQueue::class),
803+
$amqpExchange = $this->createMock(\AMQPExchange::class)
804+
);
805+
806+
$amqpExchange->expects($this->exactly(2))
807+
->method('publish')
808+
->willReturnOnConsecutiveCalls(
809+
$this->throwException(new \AMQPConnectionException('a socket error occurred')),
810+
null
811+
);
812+
813+
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
814+
$connection->publish('body', [], 5000);
815+
}
816+
817+
public function testItWillRetryMaxThreeTimesWhenAMQPConnectionExceptionIsThrown()
818+
{
819+
$factory = new TestAmqpFactory(
820+
$amqpConnection = $this->createMock(\AMQPConnection::class),
821+
$amqpChannel = $this->createMock(\AMQPChannel::class),
822+
$amqpQueue = $this->createMock(\AMQPQueue::class),
823+
$amqpExchange = $this->createMock(\AMQPExchange::class)
824+
);
825+
826+
$exception = new \AMQPConnectionException('a socket error occurred');
827+
828+
$amqpExchange->expects($this->exactly(4))
829+
->method('publish')
830+
->willReturnOnConsecutiveCalls(
831+
$this->throwException($exception),
832+
$this->throwException($exception),
833+
$this->throwException($exception),
834+
$this->throwException($exception),
835+
);
836+
837+
self::expectException($exception::class);
838+
self::expectExceptionMessage($exception->getMessage());
839+
840+
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
841+
$connection->publish('body');
842+
}
843+
777844
private function createDelayOrRetryConnection(\AMQPExchange $delayExchange, string $deadLetterExchangeName, string $delayQueueName): Connection
778845
{
779846
$amqpConnection = $this->createMock(\AMQPConnection::class);

‎src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php
+39-13Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -287,19 +287,21 @@ public function publish(string $body, array $headers = [], int $delayInMs = 0, ?
287287
$this->setupExchangeAndQueues(); // also setup normal exchange for delayed messages so delay queue can DLX messages to it
288288
}
289289

290-
if (0 !== $delayInMs) {
291-
$this->publishWithDelay($body, $headers, $delayInMs, $amqpStamp);
290+
$this->withConnectionExceptionRetry(function () use ($body, $headers, $delayInMs, $amqpStamp) {
291+
if (0 !== $delayInMs) {
292+
$this->publishWithDelay($body, $headers, $delayInMs, $amqpStamp);
292293

293-
return;
294-
}
294+
return;
295+
}
295296

296-
$this->publishOnExchange(
297-
$this->exchange(),
298-
$body,
299-
$this->getRoutingKeyForMessage($amqpStamp),
300-
$headers,
301-
$amqpStamp
302-
);
297+
$this->publishOnExchange(
298+
$this->exchange(),
299+
$body,
300+
$this->getRoutingKeyForMessage($amqpStamp),
301+
$headers,
302+
$amqpStamp
303+
);
304+
});
303305
}
304306

305307
/**
@@ -545,11 +547,16 @@ public function exchange(): \AMQPExchange
545547
private function clearWhenDisconnected(): void
546548
{
547549
if (!$this->channel()->isConnected()) {
548-
unset($this->amqpChannel, $this->amqpExchange, $this->amqpDelayExchange);
549-
$this->amqpQueues = [];
550+
$this->clear();
550551
}
551552
}
552553

554+
private function clear(): void
555+
{
556+
unset($this->amqpChannel, $this->amqpExchange, $this->amqpDelayExchange);
557+
$this->amqpQueues = [];
558+
}
559+
553560
private function getDefaultPublishRoutingKey(): ?string
554561
{
555562
return $this->exchangeOptions['default_publish_routing_key'] ?? null;
@@ -566,4 +573,23 @@ private function getRoutingKeyForMessage(?AmqpStamp $amqpStamp): ?string
566573
{
567574
return $amqpStamp?->getRoutingKey() ?? $this->getDefaultPublishRoutingKey();
568575
}
576+
577+
private function withConnectionExceptionRetry(callable $callable): void
578+
{
579+
$maxRetries = 3;
580+
$retries = 0;
581+
582+
retry:
583+
try {
584+
$callable();
585+
} catch (\AMQPConnectionException $e) {
586+
if (++$retries <= $maxRetries) {
587+
$this->clear();
588+
589+
goto retry;
590+
}
591+
592+
throw $e;
593+
}
594+
}
569595
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.