Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 5810b6c

Browse filesBrowse files
monteirochalasr
authored andcommitted
Messenger multiple failed transports
1 parent 2dcf313 commit 5810b6c
Copy full SHA for 5810b6c

24 files changed

+1498
-71
lines changed

‎src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php
+4Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1313,6 +1313,10 @@ function ($a) {
13131313
->prototype('variable')
13141314
->end()
13151315
->end()
1316+
->scalarNode('failure_transport')
1317+
->defaultNull()
1318+
->info('Transport name to send failed messages to (after all retries have failed).')
1319+
->end()
13161320
->arrayNode('retry_strategy')
13171321
->addDefaultsIfNotSet()
13181322
->beforeNormalization()

‎src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
+42-9Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1915,15 +1915,38 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
19151915
$container->setAlias('messenger.default_serializer', $config['serializer']['default_serializer']);
19161916
}
19171917

1918+
$failureTransports = [];
1919+
if ($config['failure_transport']) {
1920+
if (!isset($config['transports'][$config['failure_transport']])) {
1921+
throw new LogicException(sprintf('Invalid Messenger configuration: the failure transport "%s" is not a valid transport or service id.', $config['failure_transport']));
1922+
}
1923+
1924+
$container->setAlias('messenger.failure_transports.default', 'messenger.transport.'.$config['failure_transport']);
1925+
$failureTransports[] = $config['failure_transport'];
1926+
}
1927+
1928+
$failureTransportsByName = [];
1929+
foreach ($config['transports'] as $name => $transport) {
1930+
if ($transport['failure_transport']) {
1931+
$failureTransports[] = $transport['failure_transport'];
1932+
$failureTransportsByName[$name] = $transport['failure_transport'];
1933+
} elseif ($config['failure_transport']) {
1934+
$failureTransportsByName[$name] = $config['failure_transport'];
1935+
}
1936+
}
1937+
19181938
$senderAliases = [];
19191939
$transportRetryReferences = [];
19201940
foreach ($config['transports'] as $name => $transport) {
19211941
$serializerId = $transport['serializer'] ?? 'messenger.default_serializer';
1922-
19231942
$transportDefinition = (new Definition(TransportInterface::class))
19241943
->setFactory([new Reference('messenger.transport_factory'), 'createTransport'])
19251944
->setArguments([$transport['dsn'], $transport['options'] + ['transport_name' => $name], new Reference($serializerId)])
1926-
->addTag('messenger.receiver', ['alias' => $name])
1945+
->addTag('messenger.receiver', [
1946+
'alias' => $name,
1947+
'is_failure_transport' => \in_array($name, $failureTransports),
1948+
]
1949+
)
19271950
;
19281951
$container->setDefinition($transportId = 'messenger.transport.'.$name, $transportDefinition);
19291952
$senderAliases[$name] = $transportId;
@@ -1954,6 +1977,18 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
19541977
$senderReferences[$serviceId] = new Reference($serviceId);
19551978
}
19561979

1980+
foreach ($config['transports'] as $name => $transport) {
1981+
if ($transport['failure_transport']) {
1982+
if (!isset($senderReferences[$transport['failure_transport']])) {
1983+
throw new LogicException(sprintf('Invalid Messenger configuration: the failure transport "%s" is not a valid transport or service id.', $transport['failure_transport']));
1984+
}
1985+
}
1986+
}
1987+
1988+
$failureTransportReferencesByTransportName = array_map(function ($failureTransportName) use ($senderReferences) {
1989+
return $senderReferences[$failureTransportName];
1990+
}, $failureTransportsByName);
1991+
19571992
$messageToSendersMapping = [];
19581993
foreach ($config['routing'] as $message => $messageConfiguration) {
19591994
if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) {
@@ -1984,19 +2019,17 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
19842019
$container->getDefinition('messenger.retry_strategy_locator')
19852020
->replaceArgument(0, $transportRetryReferences);
19862021

1987-
if ($config['failure_transport']) {
1988-
if (!isset($senderReferences[$config['failure_transport']])) {
1989-
throw new LogicException(sprintf('Invalid Messenger configuration: the failure transport "%s" is not a valid transport or service id.', $config['failure_transport']));
1990-
}
1991-
1992-
$container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')
1993-
->replaceArgument(0, $senderReferences[$config['failure_transport']]);
2022+
if (\count($failureTransports) > 0) {
19942023
$container->getDefinition('console.command.messenger_failed_messages_retry')
19952024
->replaceArgument(0, $config['failure_transport']);
19962025
$container->getDefinition('console.command.messenger_failed_messages_show')
19972026
->replaceArgument(0, $config['failure_transport']);
19982027
$container->getDefinition('console.command.messenger_failed_messages_remove')
19992028
->replaceArgument(0, $config['failure_transport']);
2029+
2030+
$failureTransportsByTransportNameServiceLocator = ServiceLocatorTagPass::register($container, $failureTransportReferencesByTransportName);
2031+
$container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')
2032+
->replaceArgument(0, $failureTransportsByTransportNameServiceLocator);
20002033
} else {
20012034
$container->removeDefinition('messenger.failure.send_failed_message_to_failure_transport_listener');
20022035
$container->removeDefinition('console.command.messenger_failed_messages_retry');

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/console.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/console.php
+6-6Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,8 @@
165165

166166
->set('console.command.messenger_failed_messages_retry', FailedMessagesRetryCommand::class)
167167
->args([
168-
abstract_arg('Receiver name'),
169-
abstract_arg('Receiver'),
168+
abstract_arg('Default failure receiver name'),
169+
abstract_arg('Receivers'),
170170
service('messenger.routable_message_bus'),
171171
service('event_dispatcher'),
172172
service('logger'),
@@ -175,15 +175,15 @@
175175

176176
->set('console.command.messenger_failed_messages_show', FailedMessagesShowCommand::class)
177177
->args([
178-
abstract_arg('Receiver name'),
179-
abstract_arg('Receiver'),
178+
abstract_arg('Default failure receiver name'),
179+
abstract_arg('Receivers'),
180180
])
181181
->tag('console.command')
182182

183183
->set('console.command.messenger_failed_messages_remove', FailedMessagesRemoveCommand::class)
184184
->args([
185-
abstract_arg('Receiver name'),
186-
abstract_arg('Receiver'),
185+
abstract_arg('Default failure receiver name'),
186+
abstract_arg('Receivers'),
187187
])
188188
->tag('console.command')
189189

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@
172172

173173
->set('messenger.failure.send_failed_message_to_failure_transport_listener', SendFailedMessageToFailureTransportListener::class)
174174
->args([
175-
abstract_arg('failure transport'),
175+
abstract_arg('failure transports'),
176176
service('logger')->ignoreOnInvalid(),
177177
])
178178
->tag('kernel.event_subscriber')

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/schema/symfony-1.0.xsd

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/schema/symfony-1.0.xsd
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -491,6 +491,7 @@
491491
<xsd:attribute name="name" type="xsd:string" />
492492
<xsd:attribute name="serializer" type="xsd:string" />
493493
<xsd:attribute name="dsn" type="xsd:string" />
494+
<xsd:attribute name="failure-transport" type="xsd:string" />
494495
</xsd:complexType>
495496

496497
<xsd:complexType name="messenger_retry_strategy">
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<?php
2+
3+
$container->loadFromExtension('framework', [
4+
'messenger' => [
5+
'transports' => [
6+
'transport_1' => [
7+
'dsn' => 'null://',
8+
'failure_transport' => 'failure_transport_1'
9+
],
10+
'transport_2' => 'null://',
11+
'transport_3' => [
12+
'dsn' => 'null://',
13+
'failure_transport' => 'failure_transport_3'
14+
],
15+
'failure_transport_1' => 'null://',
16+
'failure_transport_3' => 'null://'
17+
],
18+
],
19+
]);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
<?php
2+
3+
$container->loadFromExtension('framework', [
4+
'messenger' => [
5+
'failure_transport' => 'failure_transport_global',
6+
'transports' => [
7+
'transport_1' => [
8+
'dsn' => 'null://',
9+
'failure_transport' => 'failure_transport_1'
10+
],
11+
'transport_2' => 'null://',
12+
'transport_3' => [
13+
'dsn' => 'null://',
14+
'failure_transport' => 'failure_transport_3'
15+
],
16+
'failure_transport_global' => 'null://',
17+
'failure_transport_1' => 'null://',
18+
'failure_transport_3' => 'null://',
19+
],
20+
],
21+
]);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
<?xml version="1.0" encoding="utf-8" ?>
2+
<container xmlns="http://symfony.com/schema/dic/services"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xmlns:framework="http://symfony.com/schema/dic/symfony"
5+
xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd
6+
http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
7+
8+
<framework:config>
9+
<framework:messenger>
10+
<framework:transport name="transport_1" dsn="null://" failure-transport="failure_transport_1" />
11+
<framework:transport name="transport_2" dsn="null://" />
12+
<framework:transport name="transport_3" dsn="null://" failure-transport="failure_transport_3" />
13+
<framework:transport name="failure_transport_1" dsn="null://" />
14+
<framework:transport name="failure_transport_3" dsn="null://" />
15+
</framework:messenger>
16+
</framework:config>
17+
</container>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?xml version="1.0" encoding="utf-8" ?>
2+
<container xmlns="http://symfony.com/schema/dic/services"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xmlns:framework="http://symfony.com/schema/dic/symfony"
5+
xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd
6+
http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
7+
8+
<framework:config>
9+
<framework:messenger failure-transport="failure_transport_global">
10+
<framework:transport name="transport_1" dsn="null://" failure-transport="failure_transport_1" />
11+
<framework:transport name="transport_2" dsn="null://" />
12+
<framework:transport name="transport_3" dsn="null://" failure-transport="failure_transport_3" />
13+
<framework:transport name="failure_transport_global" dsn="null://" />
14+
<framework:transport name="failure_transport_1" dsn="null://" />
15+
<framework:transport name="failure_transport_3" dsn="null://" />
16+
</framework:messenger>
17+
</framework:config>
18+
</container>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
framework:
2+
messenger:
3+
transports:
4+
transport_1:
5+
dsn: 'null://'
6+
failure_transport: failure_transport_1
7+
transport_2: 'null://'
8+
transport_3:
9+
dsn: 'null://'
10+
failure_transport: failure_transport_3
11+
failure_transport_1: 'null://'
12+
failure_transport_3: 'null://'
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
framework:
2+
messenger:
3+
failure_transport: failure_transport_global
4+
transports:
5+
transport_1:
6+
dsn: 'null://'
7+
failure_transport: failure_transport_1
8+
transport_2: 'null://'
9+
transport_3:
10+
dsn: 'null://'
11+
failure_transport: failure_transport_3
12+
failure_transport_global: 'null://'
13+
failure_transport_1: 'null://'
14+
failure_transport_3: 'null://'

‎src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php
+98-2Lines changed: 98 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
use Symfony\Component\Cache\Adapter\RedisTagAwareAdapter;
3333
use Symfony\Component\Cache\DependencyInjection\CachePoolPass;
3434
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
35+
use Symfony\Component\DependencyInjection\Argument\ServiceClosureArgument;
3536
use Symfony\Component\DependencyInjection\ChildDefinition;
3637
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
3738
use Symfony\Component\DependencyInjection\Compiler\ResolveInstanceofConditionalsPass;
@@ -710,12 +711,92 @@ public function testMessenger()
710711
$this->assertSame(TransportFactory::class, $container->getDefinition('messenger.transport_factory')->getClass());
711712
}
712713

714+
public function testMessengerMultipleFailureTransports()
715+
{
716+
$container = $this->createContainerFromFile('messenger_multiple_failure_transports');
717+
718+
$failureTransport1Definition = $container->getDefinition('messenger.transport.failure_transport_1');
719+
$failureTransport1Tags = $failureTransport1Definition->getTag('messenger.receiver')[0];
720+
721+
$this->assertEquals([
722+
'alias' => 'failure_transport_1',
723+
'is_failure_transport' => true,
724+
], $failureTransport1Tags);
725+
726+
$failureTransport3Definition = $container->getDefinition('messenger.transport.failure_transport_3');
727+
$failureTransport3Tags = $failureTransport3Definition->getTag('messenger.receiver')[0];
728+
729+
$this->assertEquals([
730+
'alias' => 'failure_transport_3',
731+
'is_failure_transport' => true,
732+
], $failureTransport3Tags);
733+
734+
// transport 2 exists but does not appear in the mapping
735+
$this->assertFalse($container->hasDefinition('messenger.transport.failure_transport_2'));
736+
737+
$failureTransportsByTransportNameServiceLocator = $container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')->getArgument(0);
738+
$failureTransports = $container->getDefinition((string) $failureTransportsByTransportNameServiceLocator)->getArgument(0);
739+
$expectedTransportsByFailureTransports = [
740+
'transport_1' => new Reference('messenger.transport.failure_transport_1'),
741+
'transport_3' => new Reference('messenger.transport.failure_transport_3'),
742+
];
743+
744+
$failureTransportsReferences = array_map(function (ServiceClosureArgument $serviceClosureArgument) {
745+
$values = $serviceClosureArgument->getValues();
746+
747+
return array_shift($values);
748+
}, $failureTransports);
749+
$this->assertEquals($expectedTransportsByFailureTransports, $failureTransportsReferences);
750+
}
751+
752+
public function testMessengerMultipleFailureTransportsWithGlobalFailureTransport()
753+
{
754+
$container = $this->createContainerFromFile('messenger_multiple_failure_transports_global');
755+
756+
$this->assertEquals('messenger.transport.failure_transport_global', (string) $container->getAlias('messenger.failure_transports.default'));
757+
758+
$failureTransport1Definition = $container->getDefinition('messenger.transport.failure_transport_1');
759+
$failureTransport1Tags = $failureTransport1Definition->getTag('messenger.receiver')[0];
760+
761+
$this->assertEquals([
762+
'alias' => 'failure_transport_1',
763+
'is_failure_transport' => true,
764+
], $failureTransport1Tags);
765+
766+
$failureTransport3Definition = $container->getDefinition('messenger.transport.failure_transport_3');
767+
$failureTransport3Tags = $failureTransport3Definition->getTag('messenger.receiver')[0];
768+
769+
$this->assertEquals([
770+
'alias' => 'failure_transport_3',
771+
'is_failure_transport' => true,
772+
], $failureTransport3Tags);
773+
774+
$failureTransportsByTransportNameServiceLocator = $container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')->getArgument(0);
775+
$failureTransports = $container->getDefinition((string) $failureTransportsByTransportNameServiceLocator)->getArgument(0);
776+
$expectedTransportsByFailureTransports = [
777+
'failure_transport_1' => new Reference('messenger.transport.failure_transport_global'),
778+
'failure_transport_3' => new Reference('messenger.transport.failure_transport_global'),
779+
'failure_transport_global' => new Reference('messenger.transport.failure_transport_global'),
780+
'transport_1' => new Reference('messenger.transport.failure_transport_1'),
781+
'transport_2' => new Reference('messenger.transport.failure_transport_global'),
782+
'transport_3' => new Reference('messenger.transport.failure_transport_3'),
783+
];
784+
785+
$failureTransportsReferences = array_map(function (ServiceClosureArgument $serviceClosureArgument) {
786+
$values = $serviceClosureArgument->getValues();
787+
788+
return array_shift($values);
789+
}, $failureTransports);
790+
$this->assertEquals($expectedTransportsByFailureTransports, $failureTransportsReferences);
791+
}
792+
713793
public function testMessengerTransports()
714794
{
715795
$container = $this->createContainerFromFile('messenger_transports');
716796
$this->assertTrue($container->hasDefinition('messenger.transport.default'));
717797
$this->assertTrue($container->getDefinition('messenger.transport.default')->hasTag('messenger.receiver'));
718-
$this->assertEquals([['alias' => 'default']], $container->getDefinition('messenger.transport.default')->getTag('messenger.receiver'));
798+
$this->assertEquals([
799+
['alias' => 'default', 'is_failure_transport' => false], ], $container->getDefinition('messenger.transport.default')->getTag('messenger.receiver'));
719800
$transportArguments = $container->getDefinition('messenger.transport.default')->getArguments();
720801
$this->assertEquals(new Reference('messenger.default_serializer'), $transportArguments[2]);
721802

@@ -756,7 +837,22 @@ public function testMessengerTransports()
756837
$this->assertSame(3, $container->getDefinition('messenger.retry.multiplier_retry_strategy.customised')->getArgument(2));
757838
$this->assertSame(100, $container->getDefinition('messenger.retry.multiplier_retry_strategy.customised')->getArgument(3));
758839

759-
$this->assertEquals(new Reference('messenger.transport.failed'), $container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')->getArgument(0));
840+
$failureTransportsByTransportNameServiceLocator = $container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')->getArgument(0);
841+
$failureTransports = $container->getDefinition((string) $failureTransportsByTransportNameServiceLocator)->getArgument(0);
842+
$expectedTransportsByFailureTransports = [
843+
'beanstalkd' => new Reference('messenger.transport.failed'),
844+
'customised' => new Reference('messenger.transport.failed'),
845+
'default' => new Reference('messenger.transport.failed'),
846+
'failed' => new Reference('messenger.transport.failed'),
847+
'redis' => new Reference('messenger.transport.failed'),
848+
];
849+
850+
$failureTransportsReferences = array_map(function (ServiceClosureArgument $serviceClosureArgument) {
851+
$values = $serviceClosureArgument->getValues();
852+
853+
return array_shift($values);
854+
}, $failureTransports);
855+
$this->assertEquals($expectedTransportsByFailureTransports, $failureTransportsReferences);
760856
}
761857

762858
public function testMessengerRouting()

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.