Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 98eaef7

Browse filesBrowse files
committed
messenger: multiple failed transports support
[Messenger] Multiple failure transports support fix case when there is no failure transport defined avoid BC rebase with master php-cs-fix add multiple failed transport support for failed commands add support for specific failed transports with failed commands
1 parent 4dabd00 commit 98eaef7
Copy full SHA for 98eaef7

22 files changed

+620
-58
lines changed

‎src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php
+4Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1146,6 +1146,10 @@ function ($a) {
11461146
->prototype('variable')
11471147
->end()
11481148
->end()
1149+
->scalarNode('failure_transport')
1150+
->defaultNull()
1151+
->info('Transport name to send failed messages to (after all retries have failed).')
1152+
->end()
11491153
->arrayNode('retry_strategy')
11501154
->addDefaultsIfNotSet()
11511155
->beforeNormalization()

‎src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
+56-6Lines changed: 56 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1755,7 +1755,6 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
17551755
}
17561756

17571757
$sendersServiceLocator = ServiceLocatorTagPass::register($container, $senderReferences);
1758-
17591758
$container->getDefinition('messenger.senders_locator')
17601759
->replaceArgument(0, $messageToSendersMapping)
17611760
->replaceArgument(1, $sendersServiceLocator)
@@ -1768,24 +1767,75 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
17681767
$container->getDefinition('messenger.retry_strategy_locator')
17691768
->replaceArgument(0, $transportRetryReferences);
17701769

1770+
$hasAnyFailureTransport = false;
1771+
1772+
$failureTransports = [];
1773+
$failureTransportsServiceLocatorId = 'messenger.failure_transports.locator';
1774+
$failureTransportsByName = [];
1775+
$failureTransportsByNameServiceLocatorId = 'messenger.failure_transports_by_name.locator';
1776+
17711777
if ($config['failure_transport']) {
17721778
if (!isset($senderReferences[$config['failure_transport']])) {
17731779
throw new LogicException(sprintf('Invalid Messenger configuration: the failure transport "%s" is not a valid transport or service id.', $config['failure_transport']));
17741780
}
17751781

1782+
$hasAnyFailureTransport = true;
1783+
$failureTransportRef = $senderReferences[$config['failure_transport']];
1784+
$failureTransportsByName[$config['failure_transport']] = $failureTransportRef;
1785+
1786+
$container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')
1787+
->replaceArgument(0, $failureTransportRef);
1788+
$container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')
1789+
->replaceArgument(2, null);
1790+
}
1791+
1792+
foreach ($config['transports'] as $name => $transport) {
1793+
if ($transport['failure_transport']) {
1794+
if (!isset($config['transports'][$transport['failure_transport']])) {
1795+
throw new LogicException(sprintf('Invalid Messenger configuration: the failure transport "%s" is not a valid transport or service id.', $transport['failure_transport']));
1796+
}
1797+
1798+
$hasAnyFailureTransport = true;
1799+
$failureTransports[$name] = $senderReferences[$transport['failure_transport']];
1800+
$failureTransportsByName[$transport['failure_transport']] = $senderReferences[$transport['failure_transport']];
1801+
}
1802+
}
1803+
1804+
if ($hasAnyFailureTransport) {
1805+
$failureTransportsServiceLocator = ServiceLocatorTagPass::register($container, $failureTransports, $failureTransportsServiceLocatorId);
1806+
$container->getDefinition($failureTransportsServiceLocatorId)
1807+
->replaceArgument(0, $failureTransports)
1808+
->replaceArgument(1, $failureTransportsServiceLocator);
17761809
$container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')
1777-
->replaceArgument(0, $senderReferences[$config['failure_transport']]);
1810+
->replaceArgument(0, $senderReferences[$config['failure_transport']] ?? null);
1811+
$container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')
1812+
->replaceArgument(2, \count($failureTransports) > 0 ? $failureTransportsServiceLocator : null);
1813+
1814+
$failureTransportsByNameServiceLocator = ServiceLocatorTagPass::register($container, $failureTransportsByName, $failureTransportsByNameServiceLocatorId);
1815+
$container->getDefinition($failureTransportsByNameServiceLocatorId)
1816+
->replaceArgument(0, $failureTransportsByName)
1817+
->replaceArgument(1, $failureTransportsByNameServiceLocator);
1818+
17781819
$container->getDefinition('console.command.messenger_failed_messages_retry')
1779-
->replaceArgument(0, $config['failure_transport']);
1820+
->replaceArgument(0, $config['failure_transport'] ?? null)
1821+
->replaceArgument(1, $failureTransportsByName[$config['failure_transport']] ?? null)
1822+
->replaceArgument(5, $container->getDefinition($failureTransportsByNameServiceLocatorId));
1823+
17801824
$container->getDefinition('console.command.messenger_failed_messages_show')
1781-
->replaceArgument(0, $config['failure_transport']);
1825+
->replaceArgument(0, $config['failure_transport'] ?? null)
1826+
->replaceArgument(1, $failureTransportsByName[$config['failure_transport']] ?? null)
1827+
->replaceArgument(2, $container->getDefinition($failureTransportsByNameServiceLocatorId));
1828+
17821829
$container->getDefinition('console.command.messenger_failed_messages_remove')
1783-
->replaceArgument(0, $config['failure_transport']);
1830+
->replaceArgument(0, $config['failure_transport'] ?? null)
1831+
->replaceArgument(1, $failureTransportsByName[$config['failure_transport']] ?? null)
1832+
->replaceArgument(2, $container->getDefinition($failureTransportsByNameServiceLocatorId));;
1833+
17841834
} else {
1785-
$container->removeDefinition('messenger.failure.send_failed_message_to_failure_transport_listener');
17861835
$container->removeDefinition('console.command.messenger_failed_messages_retry');
17871836
$container->removeDefinition('console.command.messenger_failed_messages_show');
17881837
$container->removeDefinition('console.command.messenger_failed_messages_remove');
1838+
$container->removeDefinition('messenger.failure.send_failed_message_to_failure_transport_listener');
17891839
}
17901840
}
17911841

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml
+3Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,20 +119,23 @@
119119
<argument type="service" id="messenger.routable_message_bus" />
120120
<argument type="service" id="event_dispatcher" />
121121
<argument type="service" id="logger" />
122+
<argument id="messenger.failure_transports_by_name.locator" /> <!-- Failed transports service locator -->
122123

123124
<tag name="console.command" command="messenger:failed:retry" />
124125
</service>
125126

126127
<service id="console.command.messenger_failed_messages_show" class="Symfony\Component\Messenger\Command\FailedMessagesShowCommand">
127128
<argument /> <!-- Receiver name -->
128129
<argument /> <!-- Receiver -->
130+
<argument id="messenger.failure_transports_by_name.locator" /> <!-- Failed transports service locator -->
129131

130132
<tag name="console.command" command="messenger:failed:show" />
131133
</service>
132134

133135
<service id="console.command.messenger_failed_messages_remove" class="Symfony\Component\Messenger\Command\FailedMessagesRemoveCommand">
134136
<argument /> <!-- Receiver name -->
135137
<argument /> <!-- Receiver -->
138+
<argument id="messenger.failure_transports_by_name.locator" /> <!-- Failed transports service locator -->
136139

137140
<tag name="console.command" command="messenger:failed:remove" />
138141
</service>

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
+16-1Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,20 @@
8383

8484
<service id="messenger.transport.sqs.factory" class="Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsTransportFactory"/>
8585

86+
<!-- failure transports locator per transport -->
87+
<service id="messenger.failure_transports.locator" class="Symfony\Component\DependencyInjection\ServiceLocator">
88+
<tag name="container.service_locator" />
89+
<argument type="collection" /> <!-- failure transports map by transport -->
90+
<argument /> <!-- failure transports locator -->
91+
</service>
92+
93+
<!-- failure transports locator per name -->
94+
<service id="messenger.failure_transports_by_name.locator" class="Symfony\Component\DependencyInjection\ServiceLocator">
95+
<tag name="container.service_locator" />
96+
<argument type="collection" /> <!-- failure transports map by name -->
97+
<argument /> <!-- failure transports locator -->
98+
</service>
99+
86100
<!-- retry -->
87101
<service id="messenger.retry_strategy_locator">
88102
<tag name="container.service_locator" />
@@ -108,8 +122,9 @@
108122
<service id="messenger.failure.send_failed_message_to_failure_transport_listener" class="Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener">
109123
<tag name="kernel.event_subscriber" />
110124
<tag name="monolog.logger" channel="messenger" />
111-
<argument /> <!-- Failure transport -->
125+
<argument /> <!-- Global Failure transport -->
112126
<argument type="service" id="logger" on-invalid="ignore" />
127+
<argument /> <!-- Failure transport inside each Transport -->
113128
</service>
114129

115130
<service id="messenger.listener.dispatch_pcntl_signal_listener" class="Symfony\Component\Messenger\EventListener\DispatchPcntlSignalListener">

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/schema/symfony-1.0.xsd

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/schema/symfony-1.0.xsd
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,7 @@
451451
<xsd:attribute name="name" type="xsd:string" />
452452
<xsd:attribute name="serializer" type="xsd:string" />
453453
<xsd:attribute name="dsn" type="xsd:string" />
454+
<xsd:attribute name="failure-transport" type="xsd:string" />
454455
</xsd:complexType>
455456

456457
<xsd:complexType name="messenger_retry_strategy">
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<?php
2+
3+
$container->loadFromExtension('framework', [
4+
'messenger' => [
5+
'transports' => [
6+
'transport_1' => [
7+
'dsn' => 'null://',
8+
'failure_transport' => 'failure_transport_1'
9+
],
10+
'transport_2' => 'null://',
11+
'transport_3' => [
12+
'dsn' => 'null://',
13+
'failure_transport' => 'failure_transport_3'
14+
],
15+
'failure_transport_1' => 'null://',
16+
'failure_transport_3' => 'null://'
17+
],
18+
],
19+
]);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
<?php
2+
3+
$container->loadFromExtension('framework', [
4+
'messenger' => [
5+
'failure_transport' => 'failure_transport_global',
6+
'transports' => [
7+
'transport_1' => [
8+
'dsn' => 'null://',
9+
'failure_transport' => 'failure_transport_1'
10+
],
11+
'transport_2' => 'null://',
12+
'transport_3' => [
13+
'dsn' => 'null://',
14+
'failure_transport' => 'failure_transport_3'
15+
],
16+
'failure_transport_global' => 'null://',
17+
'failure_transport_1' => 'null://',
18+
'failure_transport_3' => 'null://',
19+
],
20+
],
21+
]);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
<?xml version="1.0" encoding="utf-8" ?>
2+
<container xmlns="http://symfony.com/schema/dic/services"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xmlns:framework="http://symfony.com/schema/dic/symfony"
5+
xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd
6+
http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
7+
8+
<framework:config>
9+
<framework:messenger>
10+
<framework:transport name="transport_1" dsn="null://" failure_transport="failure_transport_1" />
11+
<framework:transport name="transport_2" dsn="null://" />
12+
<framework:transport name="transport_3" dsn="null://" failure_transport="failure_transport_3" />
13+
<framework:transport name="failure_transport_1" dsn="null://" />
14+
<framework:transport name="failure_transport_3" dsn="null://" />
15+
</framework:messenger>
16+
</framework:config>
17+
</container>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?xml version="1.0" encoding="utf-8" ?>
2+
<container xmlns="http://symfony.com/schema/dic/services"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xmlns:framework="http://symfony.com/schema/dic/symfony"
5+
xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd
6+
http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
7+
8+
<framework:config>
9+
<framework:messenger failure_transport="failure_transport_global">
10+
<framework:transport name="transport_1" dsn="null://" failure_transport="failure_transport_1" />
11+
<framework:transport name="transport_2" dsn="null://" />
12+
<framework:transport name="transport_3" dsn="null://" failure_transport="failure_transport_3" />
13+
<framework:transport name="failure_transport_global" dsn="null://" />
14+
<framework:transport name="failure_transport_1" dsn="null://" />
15+
<framework:transport name="failure_transport_3" dsn="null://" />
16+
</framework:messenger>
17+
</framework:config>
18+
</container>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
framework:
2+
messenger:
3+
transports:
4+
transport_1:
5+
dsn: 'null://'
6+
failure_transport: failure_transport_1
7+
transport_2: 'null://'
8+
transport_3:
9+
dsn: 'null://'
10+
failure_transport: failure_transport_3
11+
failure_transport_1: 'null://'
12+
failure_transport_3: 'null://'
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
framework:
2+
messenger:
3+
failure_transport: failure_transport_global
4+
transports:
5+
transport_1:
6+
dsn: 'null://'
7+
failure_transport: failure_transport_1
8+
transport_2: 'null://'
9+
transport_3:
10+
dsn: 'null://'
11+
failure_transport: failure_transport_3
12+
failure_transport_global: 'null://'
13+
failure_transport_1: 'null://'
14+
failure_transport_3: 'null://'

‎src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php
+67Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -599,6 +599,73 @@ public function testMessenger()
599599
$this->assertSame(TransportFactory::class, $container->getDefinition('messenger.transport_factory')->getClass());
600600
}
601601

602+
public function testMessengerMultipleFailureTransports()
603+
{
604+
$container = $this->createContainerFromFile('messenger_multiple_failure_transports');
605+
$failureTransportsLocatorDefinition = $container->getDefinition('messenger.failure_transports.locator');
606+
607+
/** @var Reference $failureTransportsMapping */
608+
$failureTransportsMapping = $failureTransportsLocatorDefinition->getArgument(0);
609+
610+
// transport 2 exists but does not appear in the mapping
611+
$expectedFailureTransportsMapping = [
612+
'transport_1' => 'failure_transport_1',
613+
'transport_3' => 'failure_transport_3',
614+
];
615+
616+
$failedTransports = [
617+
'failure_transport_1',
618+
'failure_transport_3',
619+
];
620+
621+
foreach ($failureTransportsMapping as $transportName => $ref) {
622+
if (\in_array($transportName, $failedTransports)) {
623+
continue;
624+
}
625+
626+
$this->assertSame('messenger.transport.'.$expectedFailureTransportsMapping[$transportName], (string) $ref, sprintf('The transport "%s" does not have the expected failed transport reference', $transportName));
627+
}
628+
629+
$failedMessageTransportListenerReference =
630+
$container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener');
631+
$this->assertNull($failedMessageTransportListenerReference->getArgument(0));
632+
$this->assertSame($failureTransportsLocatorDefinition->getArgument(1), $failedMessageTransportListenerReference->getArgument(2));
633+
}
634+
635+
public function testMessengerMultipleFailureTransportsWithGlobalFailureTransport()
636+
{
637+
$container = $this->createContainerFromFile('messenger_multiple_failure_transports_global');
638+
$failureTransportsLocatorDefinition = $container->getDefinition('messenger.failure_transports.locator');
639+
640+
/** @var Reference $failureTransportsMapping */
641+
$failureTransportsMapping = $failureTransportsLocatorDefinition->getArgument(0);
642+
643+
$expectedFailureTransportsMapping = [
644+
'transport_1' => 'failure_transport_1',
645+
'transport_2' => 'failure_transport_global',
646+
'transport_3' => 'failure_transport_3',
647+
];
648+
649+
$failed_transports = [
650+
'failure_transport_global',
651+
'failure_transport_1',
652+
'failure_transport_3',
653+
];
654+
655+
foreach ($failureTransportsMapping as $transportName => $ref) {
656+
if (\in_array($transportName, $failed_transports)) {
657+
continue;
658+
}
659+
660+
$this->assertSame('messenger.transport.'.$expectedFailureTransportsMapping[$transportName], (string) $ref, sprintf('The transport "%s" does not have the expected failed transport reference', $transportName));
661+
}
662+
663+
$failedMessageTransportListenerReference =
664+
$container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener');
665+
$this->assertSame((string) (new Reference('messenger.transport.failure_transport_global')), (string) $failedMessageTransportListenerReference->getArgument(0));
666+
$this->assertSame($failureTransportsLocatorDefinition->getArgument(1), $failedMessageTransportListenerReference->getArgument(2));
667+
}
668+
602669
public function testMessengerTransports()
603670
{
604671
$container = $this->createContainerFromFile('messenger_transports');

‎src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php
+19-5Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use Symfony\Component\Console\Command\Command;
1515
use Symfony\Component\Console\Helper\Dumper;
1616
use Symfony\Component\Console\Style\SymfonyStyle;
17+
use Symfony\Component\DependencyInjection\ServiceLocator;
1718
use Symfony\Component\Messenger\Envelope;
1819
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
1920
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
@@ -30,18 +31,27 @@ abstract class AbstractFailedMessagesCommand extends Command
3031
{
3132
private $receiverName;
3233
private $receiver;
34+
/**
35+
* @var ServiceLocator
36+
*/
37+
private $failureTransports;
3338

34-
public function __construct(string $receiverName, ReceiverInterface $receiver)
39+
public function __construct(?string $receiverName, ?ReceiverInterface $receiver, ServiceLocator $failureTransports)
3540
{
3641
$this->receiverName = $receiverName;
3742
$this->receiver = $receiver;
43+
$this->failureTransports = $failureTransports;
3844

3945
parent::__construct();
4046
}
4147

42-
protected function getReceiverName(): string
48+
protected function getReceiverName(?string $name): string
4349
{
44-
return $this->receiverName;
50+
if ($name === null) {
51+
return $this->receiverName;
52+
}
53+
54+
return $name;
4555
}
4656

4757
/**
@@ -115,9 +125,13 @@ protected function printPendingMessagesMessage(ReceiverInterface $receiver, Symf
115125
}
116126
}
117127

118-
protected function getReceiver(): ReceiverInterface
128+
protected function getReceiver(?string $name): ReceiverInterface
119129
{
120-
return $this->receiver;
130+
if ($name === null) {
131+
return $this->receiver;
132+
}
133+
134+
return $this->failureTransports->get($name);
121135
}
122136

123137
protected function getLastRedeliveryStampWithException(Envelope $envelope): ?RedeliveryStamp

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.