Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit ec8cb3f

Browse filesBrowse files
committed
bug #57112 [Messenger] Handle AMQPConnectionException when publishing a message (jwage)
This PR was merged into the 5.4 branch. Discussion ---------- [Messenger] Handle `AMQPConnectionException` when publishing a message | Q | A | ------------- | --- | Branch? | 5.4 | Bug fix? | yes | New feature? | no | Deprecations? | no | Issues | backport of #54167 | License | MIT Commits ------- e10aa0e [Messenger] [Amqp] Handle AMQPConnectionException when publishing a message.
2 parents db1bda4 + e10aa0e commit ec8cb3f
Copy full SHA for ec8cb3f

File tree

2 files changed

+108
-15
lines changed
Filter options

2 files changed

+108
-15
lines changed

‎src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php
+67Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -813,6 +813,73 @@ public function testItCanBeConstructedWithTLSOptionsAndNonTLSDsn()
813813
);
814814
}
815815

816+
public function testItCanRetryPublishWhenAMQPConnectionExceptionIsThrown()
817+
{
818+
$factory = new TestAmqpFactory(
819+
$amqpConnection = $this->createMock(\AMQPConnection::class),
820+
$amqpChannel = $this->createMock(\AMQPChannel::class),
821+
$amqpQueue = $this->createMock(\AMQPQueue::class),
822+
$amqpExchange = $this->createMock(\AMQPExchange::class)
823+
);
824+
825+
$amqpExchange->expects($this->exactly(2))
826+
->method('publish')
827+
->willReturnOnConsecutiveCalls(
828+
$this->throwException(new \AMQPConnectionException('a socket error occurred')),
829+
null
830+
);
831+
832+
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
833+
$connection->publish('body');
834+
}
835+
836+
public function testItCanRetryPublishWithDelayWhenAMQPConnectionExceptionIsThrown()
837+
{
838+
$factory = new TestAmqpFactory(
839+
$amqpConnection = $this->createMock(\AMQPConnection::class),
840+
$amqpChannel = $this->createMock(\AMQPChannel::class),
841+
$amqpQueue = $this->createMock(\AMQPQueue::class),
842+
$amqpExchange = $this->createMock(\AMQPExchange::class)
843+
);
844+
845+
$amqpExchange->expects($this->exactly(2))
846+
->method('publish')
847+
->willReturnOnConsecutiveCalls(
848+
$this->throwException(new \AMQPConnectionException('a socket error occurred')),
849+
null
850+
);
851+
852+
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
853+
$connection->publish('body', [], 5000);
854+
}
855+
856+
public function testItWillRetryMaxThreeTimesWhenAMQPConnectionExceptionIsThrown()
857+
{
858+
$factory = new TestAmqpFactory(
859+
$amqpConnection = $this->createMock(\AMQPConnection::class),
860+
$amqpChannel = $this->createMock(\AMQPChannel::class),
861+
$amqpQueue = $this->createMock(\AMQPQueue::class),
862+
$amqpExchange = $this->createMock(\AMQPExchange::class)
863+
);
864+
865+
$exception = new \AMQPConnectionException('a socket error occurred');
866+
867+
$amqpExchange->expects($this->exactly(4))
868+
->method('publish')
869+
->willReturnOnConsecutiveCalls(
870+
$this->throwException($exception),
871+
$this->throwException($exception),
872+
$this->throwException($exception),
873+
$this->throwException($exception)
874+
);
875+
876+
self::expectException(get_class($exception));
877+
self::expectExceptionMessage($exception->getMessage());
878+
879+
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
880+
$connection->publish('body');
881+
}
882+
816883
private function createDelayOrRetryConnection(\AMQPExchange $delayExchange, string $deadLetterExchangeName, string $delayQueueName): Connection
817884
{
818885
$amqpConnection = $this->createMock(\AMQPConnection::class);

‎src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php
+41-15Lines changed: 41 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -306,19 +306,21 @@ public function publish(string $body, array $headers = [], int $delayInMs = 0, ?
306306
$this->setupExchangeAndQueues(); // also setup normal exchange for delayed messages so delay queue can DLX messages to it
307307
}
308308

309-
if (0 !== $delayInMs) {
310-
$this->publishWithDelay($body, $headers, $delayInMs, $amqpStamp);
309+
$this->withConnectionExceptionRetry(function () use ($body, $headers, $delayInMs, $amqpStamp) {
310+
if (0 !== $delayInMs) {
311+
$this->publishWithDelay($body, $headers, $delayInMs, $amqpStamp);
311312

312-
return;
313-
}
313+
return;
314+
}
314315

315-
$this->publishOnExchange(
316-
$this->exchange(),
317-
$body,
318-
$this->getRoutingKeyForMessage($amqpStamp),
319-
$headers,
320-
$amqpStamp
321-
);
316+
$this->publishOnExchange(
317+
$this->exchange(),
318+
$body,
319+
$this->getRoutingKeyForMessage($amqpStamp),
320+
$headers,
321+
$amqpStamp
322+
);
323+
});
322324
}
323325

324326
/**
@@ -570,13 +572,18 @@ public function exchange(): \AMQPExchange
570572
private function clearWhenDisconnected(): void
571573
{
572574
if (!$this->channel()->isConnected()) {
573-
$this->amqpChannel = null;
574-
$this->amqpQueues = [];
575-
$this->amqpExchange = null;
576-
$this->amqpDelayExchange = null;
575+
$this->clear();
577576
}
578577
}
579578

579+
private function clear(): void
580+
{
581+
$this->amqpChannel = null;
582+
$this->amqpQueues = [];
583+
$this->amqpExchange = null;
584+
$this->amqpDelayExchange = null;
585+
}
586+
580587
private function getDefaultPublishRoutingKey(): ?string
581588
{
582589
return $this->exchangeOptions['default_publish_routing_key'] ?? null;
@@ -593,6 +600,25 @@ private function getRoutingKeyForMessage(?AmqpStamp $amqpStamp): ?string
593600
{
594601
return (null !== $amqpStamp ? $amqpStamp->getRoutingKey() : null) ?? $this->getDefaultPublishRoutingKey();
595602
}
603+
604+
private function withConnectionExceptionRetry(callable $callable): void
605+
{
606+
$maxRetries = 3;
607+
$retries = 0;
608+
609+
retry:
610+
try {
611+
$callable();
612+
} catch (\AMQPConnectionException $e) {
613+
if (++$retries <= $maxRetries) {
614+
$this->clear();
615+
616+
goto retry;
617+
}
618+
619+
throw $e;
620+
}
621+
}
596622
}
597623

598624
if (!class_exists(\Symfony\Component\Messenger\Transport\AmqpExt\Connection::class, false)) {

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.