Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit e43b198

Browse filesBrowse files
committed
bug symfony#54167 [Messenger] [Amqp] Handle AMQPConnectionException when publishing a message. (jwage)
This PR was squashed before being merged into the 6.4 branch. Discussion ---------- [Messenger] [Amqp] Handle AMQPConnectionException when publishing a message. | Q | A | ------------- | --- | Branch? | 6.4 | Bug fix? | yes | New feature? | no | Deprecations? | no | Issues | Fix symfony#36538 Fix symfony#48241 | License | MIT If you have a message handler that dispatches messages to another queue, you can encounter `AMQPConnectionException` with the message "Library error: a SSL error occurred" or "a socket error occurred" depending on if you are using tls or not or if you are running behind a load balancer or not. You can manually reproduce this issue by dispatching a message where the handler then dispatches another message to a different queue, then go to rabbitmq admin and close the connection manually, then dispatch another message and when the message handler goes to dispatch the other message, you will get this exception: ``` a socket error occurred #0 /vagrant/vendor/symfony/amqp-messenger/Transport/AmqpTransport.php(60): Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpSender->send() #1 /vagrant/vendor/symfony/messenger/Middleware/SendMessageMiddleware.php(62): Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpTransport->send() #2 /vagrant/vendor/symfony/messenger/Middleware/FailedMessageProcessingMiddleware.php(34): Symfony\Component\Messenger\Middleware\SendMessageMiddleware->handle() #3 /vagrant/vendor/symfony/messenger/Middleware/DispatchAfterCurrentBusMiddleware.php(61): Symfony\Component\Messenger\Middleware\FailedMessageProcessingMiddleware->handle() #4 /vagrant/vendor/symfony/messenger/Middleware/RejectRedeliveredMessageMiddleware.php(41): Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware->handle() #5 /vagrant/vendor/symfony/messenger/Middleware/AddBusNameStampMiddleware.php(37): Symfony\Component\Messenger\Middleware\RejectRedeliveredMessageMiddleware->handle() #6 /vagrant/vendor/symfony/messenger/Middleware/TraceableMiddleware.php(40): Symfony\Component\Messenger\Middleware\AddBusNameStampMiddleware->handle() #7 /vagrant/vendor/symfony/messenger/MessageBus.php(70): Symfony\Component\Messenger\Middleware\TraceableMiddleware->handle() #8 /vagrant/vendor/symfony/messenger/TraceableMessageBus.php(38): Symfony\Component\Messenger\MessageBus->dispatch() #9 /vagrant/src/Messenger/MessageBus.php(37): Symfony\Component\Messenger\TraceableMessageBus->dispatch() #10 /vagrant/vendor/symfony/mailer/Mailer.php(66): App\Messenger\MessageBus->dispatch() #11 /vagrant/src/Mailer/Mailer.php(83): Symfony\Component\Mailer\Mailer->send() #12 /vagrant/src/Mailer/Mailer.php(96): App\Mailer\Mailer->send() #13 /vagrant/src/MessageHandler/Trading/StrategySubscriptionMessageHandler.php(118): App\Mailer\Mailer->sendEmail() #14 /vagrant/src/MessageHandler/Trading/StrategySubscriptionMessageHandler.php(72): App\MessageHandler\Trading\StrategySubscriptionMessageHandler->handle() #15 /vagrant/vendor/symfony/messenger/Middleware/HandleMessageMiddleware.php(152): App\MessageHandler\Trading\StrategySubscriptionMessageHandler->__invoke() symfony#16 /vagrant/vendor/symfony/messenger/Middleware/HandleMessageMiddleware.php(91): Symfony\Component\Messenger\Middleware\HandleMessageMiddleware->callHandler() symfony#17 /vagrant/vendor/symfony/messenger/Middleware/SendMessageMiddleware.php(71): Symfony\Component\Messenger\Middleware\HandleMessageMiddleware->handle() symfony#18 /vagrant/vendor/symfony/messenger/Middleware/FailedMessageProcessingMiddleware.php(34): Symfony\Component\Messenger\Middleware\SendMessageMiddleware->handle() symfony#19 /vagrant/vendor/symfony/messenger/Middleware/DispatchAfterCurrentBusMiddleware.php(68): Symfony\Component\Messenger\Middleware\FailedMessageProcessingMiddleware->handle() symfony#20 /vagrant/vendor/symfony/messenger/Middleware/RejectRedeliveredMessageMiddleware.php(41): Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware->handle() symfony#21 /vagrant/vendor/symfony/messenger/Middleware/AddBusNameStampMiddleware.php(37): Symfony\Component\Messenger\Middleware\RejectRedeliveredMessageMiddleware->handle() symfony#22 /vagrant/vendor/symfony/messenger/Middleware/TraceableMiddleware.php(40): Symfony\Component\Messenger\Middleware\AddBusNameStampMiddleware->handle() symfony#23 /vagrant/vendor/symfony/messenger/MessageBus.php(70): Symfony\Component\Messenger\Middleware\TraceableMiddleware->handle() symfony#24 /vagrant/vendor/symfony/messenger/TraceableMessageBus.php(38): Symfony\Component\Messenger\MessageBus->dispatch() symfony#25 /vagrant/vendor/symfony/messenger/RoutableMessageBus.php(54): Symfony\Component\Messenger\TraceableMessageBus->dispatch() symfony#26 /vagrant/vendor/symfony/messenger/Worker.php(162): Symfony\Component\Messenger\RoutableMessageBus->dispatch() symfony#27 /vagrant/vendor/symfony/messenger/Worker.php(109): Symfony\Component\Messenger\Worker->handleMessage() symfony#28 /vagrant/vendor/symfony/messenger/Command/ConsumeMessagesCommand.php(238): Symfony\Component\Messenger\Worker->run() symfony#29 /vagrant/vendor/symfony/console/Command/Command.php(326): Symfony\Component\Messenger\Command\ConsumeMessagesCommand->execute() symfony#30 /vagrant/vendor/symfony/console/Application.php(1096): Symfony\Component\Console\Command\Command->run() symfony#31 /vagrant/vendor/symfony/framework-bundle/Console/Application.php(126): Symfony\Component\Console\Application->doRunCommand() symfony#32 /vagrant/vendor/symfony/console/Application.php(324): Symfony\Bundle\FrameworkBundle\Console\Application->doRunCommand() symfony#33 /vagrant/vendor/symfony/framework-bundle/Console/Application.php(80): Symfony\Component\Console\Application->doRun() symfony#34 /vagrant/vendor/symfony/console/Application.php(175): Symfony\Bundle\FrameworkBundle\Console\Application->doRun() symfony#35 /vagrant/vendor/symfony/runtime/Runner/Symfony/ConsoleApplicationRunner.php(49): Symfony\Component\Console\Application->run() symfony#36 /vagrant/vendor/autoload_runtime.php(29): Symfony\Component\Runtime\Runner\Symfony\ConsoleApplicationRunner->run() symfony#37 /vagrant/bin/console(11): require_once('...') symfony#38 {main} ``` TODO: - [x] Add test for retry logic when publishing messages Commits ------- f123370 [Messenger] [Amqp] Handle AMQPConnectionException when publishing a message.
2 parents 0523300 + f123370 commit e43b198
Copy full SHA for e43b198

File tree

Expand file treeCollapse file tree

2 files changed

+106
-13
lines changed
Filter options
Expand file treeCollapse file tree

2 files changed

+106
-13
lines changed

‎src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php
+67Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -774,6 +774,73 @@ public function testItCanBeConstructedWithTLSOptionsAndNonTLSDsn()
774774
);
775775
}
776776

777+
public function testItCanRetryPublishWhenAMQPConnectionExceptionIsThrown()
778+
{
779+
$factory = new TestAmqpFactory(
780+
$amqpConnection = $this->createMock(\AMQPConnection::class),
781+
$amqpChannel = $this->createMock(\AMQPChannel::class),
782+
$amqpQueue = $this->createMock(\AMQPQueue::class),
783+
$amqpExchange = $this->createMock(\AMQPExchange::class)
784+
);
785+
786+
$amqpExchange->expects($this->exactly(2))
787+
->method('publish')
788+
->willReturnOnConsecutiveCalls(
789+
$this->throwException(new \AMQPConnectionException('a socket error occurred')),
790+
null
791+
);
792+
793+
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
794+
$connection->publish('body');
795+
}
796+
797+
public function testItCanRetryPublishWithDelayWhenAMQPConnectionExceptionIsThrown()
798+
{
799+
$factory = new TestAmqpFactory(
800+
$amqpConnection = $this->createMock(\AMQPConnection::class),
801+
$amqpChannel = $this->createMock(\AMQPChannel::class),
802+
$amqpQueue = $this->createMock(\AMQPQueue::class),
803+
$amqpExchange = $this->createMock(\AMQPExchange::class)
804+
);
805+
806+
$amqpExchange->expects($this->exactly(2))
807+
->method('publish')
808+
->willReturnOnConsecutiveCalls(
809+
$this->throwException(new \AMQPConnectionException('a socket error occurred')),
810+
null
811+
);
812+
813+
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
814+
$connection->publish('body', [], 5000);
815+
}
816+
817+
public function testItWillRetryMaxThreeTimesWhenAMQPConnectionExceptionIsThrown()
818+
{
819+
$factory = new TestAmqpFactory(
820+
$amqpConnection = $this->createMock(\AMQPConnection::class),
821+
$amqpChannel = $this->createMock(\AMQPChannel::class),
822+
$amqpQueue = $this->createMock(\AMQPQueue::class),
823+
$amqpExchange = $this->createMock(\AMQPExchange::class)
824+
);
825+
826+
$exception = new \AMQPConnectionException('a socket error occurred');
827+
828+
$amqpExchange->expects($this->exactly(4))
829+
->method('publish')
830+
->willReturnOnConsecutiveCalls(
831+
$this->throwException($exception),
832+
$this->throwException($exception),
833+
$this->throwException($exception),
834+
$this->throwException($exception),
835+
);
836+
837+
self::expectException($exception::class);
838+
self::expectExceptionMessage($exception->getMessage());
839+
840+
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
841+
$connection->publish('body');
842+
}
843+
777844
private function createDelayOrRetryConnection(\AMQPExchange $delayExchange, string $deadLetterExchangeName, string $delayQueueName): Connection
778845
{
779846
$amqpConnection = $this->createMock(\AMQPConnection::class);

‎src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php
+39-13Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -287,19 +287,21 @@ public function publish(string $body, array $headers = [], int $delayInMs = 0, ?
287287
$this->setupExchangeAndQueues(); // also setup normal exchange for delayed messages so delay queue can DLX messages to it
288288
}
289289

290-
if (0 !== $delayInMs) {
291-
$this->publishWithDelay($body, $headers, $delayInMs, $amqpStamp);
290+
$this->withConnectionExceptionRetry(function () use ($body, $headers, $delayInMs, $amqpStamp) {
291+
if (0 !== $delayInMs) {
292+
$this->publishWithDelay($body, $headers, $delayInMs, $amqpStamp);
292293

293-
return;
294-
}
294+
return;
295+
}
295296

296-
$this->publishOnExchange(
297-
$this->exchange(),
298-
$body,
299-
$this->getRoutingKeyForMessage($amqpStamp),
300-
$headers,
301-
$amqpStamp
302-
);
297+
$this->publishOnExchange(
298+
$this->exchange(),
299+
$body,
300+
$this->getRoutingKeyForMessage($amqpStamp),
301+
$headers,
302+
$amqpStamp
303+
);
304+
});
303305
}
304306

305307
/**
@@ -545,11 +547,16 @@ public function exchange(): \AMQPExchange
545547
private function clearWhenDisconnected(): void
546548
{
547549
if (!$this->channel()->isConnected()) {
548-
unset($this->amqpChannel, $this->amqpExchange, $this->amqpDelayExchange);
549-
$this->amqpQueues = [];
550+
$this->clear();
550551
}
551552
}
552553

554+
private function clear(): void
555+
{
556+
unset($this->amqpChannel, $this->amqpExchange, $this->amqpDelayExchange);
557+
$this->amqpQueues = [];
558+
}
559+
553560
private function getDefaultPublishRoutingKey(): ?string
554561
{
555562
return $this->exchangeOptions['default_publish_routing_key'] ?? null;
@@ -566,4 +573,23 @@ private function getRoutingKeyForMessage(?AmqpStamp $amqpStamp): ?string
566573
{
567574
return $amqpStamp?->getRoutingKey() ?? $this->getDefaultPublishRoutingKey();
568575
}
576+
577+
private function withConnectionExceptionRetry(callable $callable): void
578+
{
579+
$maxRetries = 3;
580+
$retries = 0;
581+
582+
retry:
583+
try {
584+
$callable();
585+
} catch (\AMQPConnectionException $e) {
586+
if (++$retries <= $maxRetries) {
587+
$this->clear();
588+
589+
goto retry;
590+
}
591+
592+
throw $e;
593+
}
594+
}
569595
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.