Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit fdeb93e

Browse filesBrowse files
committed
feature #30970 [Messenger] Adding failure transport support (weaverryan)
This PR was squashed before being merged into the 4.3-dev branch (closes #30970). Discussion ---------- [Messenger] Adding failure transport support | Q | A | ------------- | --- | Branch? | master | Bug fix? | yes | New feature? | yes | BC breaks? | yes | Deprecations? | no | Tests pass? | yes | Fixed tickets | #31231 | License | MIT | Doc PR | symfony/symfony-docs#11236 This adds "failure" transport support for messenger, so that messages that fail on *all* their retries can be collected in one spot and retried later if wanted: ```yml framework: messenger: failure_transport: failed transports: async: dsn: 'amqp://' failed: dsn: 'doctrine://default?queue_name=failed' routing: 'App\Message\SmsNotification': async ``` In this setup, `SmsNotification` would be retried 3 times on the `async` transport (current behavior) and then finally sent to the `failed` transport. The `failed` transport can be consumed like a normal transport, but should usually be handled & consumed by one of the new commands: **> bin/console messenger:failed:show** <img width="861" alt="Screen Shot 2019-04-10 at 3 15 45 PM" src="https://user-images.githubusercontent.com/121003/55917329-ddc54280-5ba3-11e9-878c-af3c653643de.png"> **> bin/console messenger:failed:show 217** <img width="804" alt="Screen Shot 2019-04-10 at 3 15 55 PM" src="https://user-images.githubusercontent.com/121003/55917360-f33a6c80-5ba3-11e9-9f12-a8c57a9a7a4b.png"> **> bin/console messenger:failed:purge 217** <img width="835" alt="Screen Shot 2019-04-10 at 3 16 07 PM" src="https://user-images.githubusercontent.com/121003/55917383-ff262e80-5ba3-11e9-9720-e24176b834f7.png"> **> bin/console messenger:failed:retry 217** <img width="737" alt="Screen Shot 2019-04-10 at 3 16 29 PM" src="https://user-images.githubusercontent.com/121003/55917396-09482d00-5ba4-11e9-8d51-0bbe2b4ffc14.png"> **> bin/console messenger:failed:retry 218 -vv** <img width="1011" alt="Screen Shot 2019-04-10 at 3 20 39 PM" src="https://user-images.githubusercontent.com/121003/55917503-6512b600-5ba4-11e9-9365-4ac87d858541.png"> **Note** (This screenshot is ugly - need to make the dump of the message and the exception more attractive) Or you can run `bin/console messenger:failed:retry` without any argument, and it will consume the failed messages one-by-one and ask you if you want to retry/handle each. By passing Cheers! Commits ------- 36487e5 [Messenger] Adding failure transport support
2 parents 48e3f40 + 36487e5 commit fdeb93e
Copy full SHA for fdeb93e

File tree

49 files changed

+1839
-150
lines changed
Filter options

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Dismiss banner

49 files changed

+1839
-150
lines changed

‎src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php
+4Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1200,6 +1200,10 @@ function ($a) {
12001200
->end()
12011201
->end()
12021202
->end()
1203+
->scalarNode('failure_transport')
1204+
->defaultNull()
1205+
->info('Transport name to send failed messages to (after all retries have failed).')
1206+
->end()
12031207
->scalarNode('default_bus')->defaultNull()->end()
12041208
->arrayNode('buses')
12051209
->defaultValue(['messenger.bus.default' => ['default_middleware' => true, 'middleware' => []]])

‎src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
+34-5Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@
4040
use Symfony\Component\Console\Application;
4141
use Symfony\Component\Console\Command\Command;
4242
use Symfony\Component\DependencyInjection\Alias;
43-
use Symfony\Component\DependencyInjection\Argument\IteratorArgument;
4443
use Symfony\Component\DependencyInjection\Argument\ServiceClosureArgument;
4544
use Symfony\Component\DependencyInjection\ChildDefinition;
45+
use Symfony\Component\DependencyInjection\Compiler\ServiceLocatorTagPass;
4646
use Symfony\Component\DependencyInjection\ContainerBuilder;
4747
use Symfony\Component\DependencyInjection\ContainerInterface;
4848
use Symfony\Component\DependencyInjection\Definition;
@@ -284,6 +284,9 @@ public function load(array $configs, ContainerBuilder $container)
284284
$container->removeDefinition('console.command.messenger_debug');
285285
$container->removeDefinition('console.command.messenger_stop_workers');
286286
$container->removeDefinition('console.command.messenger_setup_transports');
287+
$container->removeDefinition('console.command.messenger_failed_messages_retry');
288+
$container->removeDefinition('console.command.messenger_failed_messages_show');
289+
$container->removeDefinition('console.command.messenger_failed_messages_remove');
287290
}
288291

289292
$propertyInfoEnabled = $this->isConfigEnabled($container, $config['property_info']);
@@ -1743,22 +1746,48 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
17431746
if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) {
17441747
throw new LogicException(sprintf('Invalid Messenger routing configuration: class or interface "%s" not found.', $message));
17451748
}
1746-
$senders = [];
1749+
1750+
// make sure senderAliases contains all senders
17471751
foreach ($messageConfiguration['senders'] as $sender) {
1748-
$senders[$sender] = new Reference($senderAliases[$sender] ?? $sender);
1752+
if (!isset($senderAliases[$sender])) {
1753+
$senderAliases[$sender] = $sender;
1754+
}
17491755
}
17501756

1751-
$messageToSendersMapping[$message] = new IteratorArgument($senders);
1757+
$messageToSendersMapping[$message] = $messageConfiguration['senders'];
17521758
$messagesToSendAndHandle[$message] = $messageConfiguration['send_and_handle'];
17531759
}
17541760

1761+
$senderReferences = [];
1762+
foreach ($senderAliases as $alias => $serviceId) {
1763+
$senderReferences[$alias] = new Reference($serviceId);
1764+
}
1765+
17551766
$container->getDefinition('messenger.senders_locator')
17561767
->replaceArgument(0, $messageToSendersMapping)
1757-
->replaceArgument(1, $messagesToSendAndHandle)
1768+
->replaceArgument(1, ServiceLocatorTagPass::register($container, $senderReferences))
1769+
->replaceArgument(2, $messagesToSendAndHandle)
17581770
;
17591771

17601772
$container->getDefinition('messenger.retry_strategy_locator')
17611773
->replaceArgument(0, $transportRetryReferences);
1774+
1775+
$container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')
1776+
->replaceArgument(1, $config['failure_transport']);
1777+
1778+
if ($config['failure_transport']) {
1779+
$container->getDefinition('console.command.messenger_failed_messages_retry')
1780+
->replaceArgument(0, $config['failure_transport'])
1781+
->replaceArgument(4, $transportRetryReferences[$config['failure_transport']] ?? null);
1782+
$container->getDefinition('console.command.messenger_failed_messages_show')
1783+
->replaceArgument(0, $config['failure_transport']);
1784+
$container->getDefinition('console.command.messenger_failed_messages_remove')
1785+
->replaceArgument(0, $config['failure_transport']);
1786+
} else {
1787+
$container->removeDefinition('console.command.messenger_failed_messages_retry');
1788+
$container->removeDefinition('console.command.messenger_failed_messages_show');
1789+
$container->removeDefinition('console.command.messenger_failed_messages_remove');
1790+
}
17621791
}
17631792

17641793
private function registerCacheConfiguration(array $config, ContainerBuilder $container)

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml
+25Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,31 @@
114114
<tag name="console.command" command="messenger:stop-workers" />
115115
</service>
116116

117+
<service id="console.command.messenger_failed_messages_retry" class="Symfony\Component\Messenger\Command\FailedMessagesRetryCommand">
118+
<argument /> <!-- Receiver name -->
119+
<argument /> <!-- Receiver locator -->
120+
<argument type="service" id="messenger.routable_message_bus" />
121+
<argument type="service" id="event_dispatcher" />
122+
<argument /> <!-- Retry strategy -->
123+
<argument type="service" id="logger" />
124+
125+
<tag name="console.command" command="messenger:failed:retry" />
126+
</service>
127+
128+
<service id="console.command.messenger_failed_messages_show" class="Symfony\Component\Messenger\Command\FailedMessagesShowCommand">
129+
<argument /> <!-- Receiver name -->
130+
<argument /> <!-- Receiver locator -->
131+
132+
<tag name="console.command" command="messenger:failed:show" />
133+
</service>
134+
135+
<service id="console.command.messenger_failed_messages_remove" class="Symfony\Component\Messenger\Command\FailedMessagesRemoveCommand">
136+
<argument /> <!-- Receiver name -->
137+
<argument /> <!-- Receiver locator -->
138+
139+
<tag name="console.command" command="messenger:failed:remove" />
140+
</service>
141+
117142
<service id="console.command.router_debug" class="Symfony\Bundle\FrameworkBundle\Command\RouterDebugCommand">
118143
<argument type="service" id="router" />
119144
<argument type="service" id="debug.file_link_formatter" on-invalid="null" />

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
+16-1Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99

1010
<!-- Asynchronous -->
1111
<service id="messenger.senders_locator" class="Symfony\Component\Messenger\Transport\Sender\SendersLocator">
12-
<argument type="collection" /> <!-- Per message sender iterators -->
12+
<argument type="collection" /> <!-- Per message senders map -->
13+
<argument /> <!-- senders locator -->
1314
<argument type="collection" /> <!-- Messages to send and handle -->
1415
</service>
1516
<service id="messenger.middleware.send_message" class="Symfony\Component\Messenger\Middleware\SendMessageMiddleware">
@@ -91,5 +92,19 @@
9192
<argument /> <!-- multiplier -->
9293
<argument /> <!-- max delay ms -->
9394
</service>
95+
96+
<!-- failed handling -->
97+
<service id="messenger.failure.send_failed_message_to_failure_transport_listener" class="Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener">
98+
<tag name="kernel.event_subscriber" />
99+
<tag name="monolog.logger" channel="messenger" />
100+
<argument type="service" id="messenger.routable_message_bus" />
101+
<argument /> <!-- Failure transport name -->
102+
<argument type="service" id="logger" on-invalid="ignore" />
103+
</service>
104+
105+
<!-- routable message bus -->
106+
<service id="messenger.routable_message_bus" class="Symfony\Component\Messenger\RoutableMessageBus">
107+
<argument /> <!-- Message bus locator -->
108+
</service>
94109
</services>
95110
</container>

‎src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/ConfigurationTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/ConfigurationTest.php
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,7 @@ class_exists(SemaphoreStore::class) && SemaphoreStore::isSupported() ? 'semaphor
328328
'enabled' => !class_exists(FullStack::class) && interface_exists(MessageBusInterface::class),
329329
'routing' => [],
330330
'transports' => [],
331+
'failure_transport' => null,
331332
'serializer' => [
332333
'default_serializer' => 'messenger.transport.native_php_serializer',
333334
'symfony_serializer' => [

‎src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php
+8-4Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -721,12 +721,16 @@ public function testMessengerRouting()
721721
'*' => false,
722722
];
723723

724-
$this->assertSame($messageToSendAndHandleMapping, $senderLocatorDefinition->getArgument(1));
724+
$this->assertSame($messageToSendAndHandleMapping, $senderLocatorDefinition->getArgument(2));
725725
$sendersMapping = $senderLocatorDefinition->getArgument(0);
726726
$this->assertEquals([
727-
'amqp' => new Reference('messenger.transport.amqp'),
728-
'audit' => new Reference('audit'),
729-
], $sendersMapping[DummyMessage::class]->getValues());
727+
'amqp',
728+
'audit',
729+
], $sendersMapping[DummyMessage::class]);
730+
$sendersLocator = $container->getDefinition((string) $senderLocatorDefinition->getArgument(1));
731+
$this->assertSame(['amqp', 'audit'], array_keys($sendersLocator->getArgument(0)));
732+
$this->assertEquals(new Reference('messenger.transport.amqp'), $sendersLocator->getArgument(0)['amqp']->getValues()[0]);
733+
$this->assertEquals(new Reference('audit'), $sendersLocator->getArgument(0)['audit']->getValues()[0]);
730734
}
731735

732736
public function testMessengerTransportConfiguration()

‎src/Symfony/Component/Messenger/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/CHANGELOG.md
+7Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,13 @@ CHANGELOG
44
4.3.0
55
-----
66

7+
* [BC BREAK] `SendersLocatorInterface` has an additional method:
8+
`getSenderByAlias()`.
9+
* A new `ListableReceiverInterface` was added, which a receiver
10+
can implement (when applicable) to enable listing and fetching
11+
individual messages by id (used in the new "Failed Messages" commands).
12+
* Both `SenderInterface::send()` and `ReceiverInterface::get()`
13+
should now (when applicable) add a `TransportMessageIdStamp`.
714
* Added `WorkerStoppedEvent` dispatched when a worker is stopped.
815
* Added optional `MessageCountAwareInterface` that receivers can implement
916
to give information about how many messages are waiting to be processed.
+112Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Command;
13+
14+
use Symfony\Component\Console\Command\Command;
15+
use Symfony\Component\Console\Helper\Dumper;
16+
use Symfony\Component\Console\Style\SymfonyStyle;
17+
use Symfony\Component\Messenger\Envelope;
18+
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
19+
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
20+
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
21+
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
22+
23+
/**
24+
* @author Ryan Weaver <ryan@symfonycasts.com>
25+
*
26+
* @internal
27+
* @experimental in 4.3
28+
*/
29+
abstract class AbstractFailedMessagesCommand extends Command
30+
{
31+
private $receiverName;
32+
private $receiver;
33+
34+
public function __construct(string $receiverName, ReceiverInterface $receiver)
35+
{
36+
$this->receiverName = $receiverName;
37+
$this->receiver = $receiver;
38+
39+
parent::__construct();
40+
}
41+
42+
protected function getReceiverName()
43+
{
44+
return $this->receiverName;
45+
}
46+
47+
/**
48+
* @return mixed|null
49+
*/
50+
protected function getMessageId(Envelope $envelope)
51+
{
52+
/** @var TransportMessageIdStamp $stamp */
53+
$stamp = $envelope->last(TransportMessageIdStamp::class);
54+
55+
return null !== $stamp ? $stamp->getId() : null;
56+
}
57+
58+
protected function displaySingleMessage(Envelope $envelope, SymfonyStyle $io)
59+
{
60+
$io->title('Failed Message Details');
61+
62+
/** @var SentToFailureTransportStamp $sentToFailureTransportStamp */
63+
$sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class);
64+
65+
$rows = [
66+
['Class', \get_class($envelope->getMessage())],
67+
];
68+
69+
if (null !== $id = $this->getMessageId($envelope)) {
70+
$rows[] = ['Message Id', $id];
71+
}
72+
73+
if (null === $sentToFailureTransportStamp) {
74+
$io->warning('Message does not appear to have been sent to this transport after failing');
75+
} else {
76+
$rows = array_merge($rows, [
77+
['Failed at', $sentToFailureTransportStamp->getSentAt()->format('Y-m-d H:i:s')],
78+
['Error', $sentToFailureTransportStamp->getExceptionMessage()],
79+
['Error Class', $sentToFailureTransportStamp->getFlattenException() ? $sentToFailureTransportStamp->getFlattenException()->getClass() : '(unknown)'],
80+
['Transport', $sentToFailureTransportStamp->getOriginalReceiverName()],
81+
]);
82+
}
83+
84+
$io->table([], $rows);
85+
86+
if ($io->isVeryVerbose()) {
87+
$io->title('Message:');
88+
$dump = new Dumper($io);
89+
$io->writeln($dump($envelope->getMessage()));
90+
$io->title('Exception:');
91+
$io->writeln($sentToFailureTransportStamp->getFlattenException()->getTraceAsString());
92+
} else {
93+
$io->writeln(' Re-run command with <info>-vv</info> to see more message & error details.');
94+
}
95+
}
96+
97+
protected function printPendingMessagesMessage(ReceiverInterface $receiver, SymfonyStyle $io)
98+
{
99+
if ($receiver instanceof MessageCountAwareInterface) {
100+
if (1 === $receiver->getMessageCount()) {
101+
$io->writeln('There is <comment>1</comment> message pending in the failure transport.');
102+
} else {
103+
$io->writeln(sprintf('There are <comment>%d</comment> messages pending in the failure transport.', $receiver->getMessageCount()));
104+
}
105+
}
106+
}
107+
108+
protected function getReceiver(): ReceiverInterface
109+
{
110+
return $this->receiver;
111+
}
112+
}
+88Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Command;
13+
14+
use Symfony\Component\Console\Exception\RuntimeException;
15+
use Symfony\Component\Console\Input\InputArgument;
16+
use Symfony\Component\Console\Input\InputInterface;
17+
use Symfony\Component\Console\Input\InputOption;
18+
use Symfony\Component\Console\Output\ConsoleOutputInterface;
19+
use Symfony\Component\Console\Output\OutputInterface;
20+
use Symfony\Component\Console\Style\SymfonyStyle;
21+
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
22+
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
23+
24+
/**
25+
* @author Ryan Weaver <ryan@symfonycasts.com>
26+
*
27+
* @experimental in 4.3
28+
*/
29+
class FailedMessagesRemoveCommand extends AbstractFailedMessagesCommand
30+
{
31+
protected static $defaultName = 'messenger:failed:remove';
32+
33+
/**
34+
* {@inheritdoc}
35+
*/
36+
protected function configure(): void
37+
{
38+
$this
39+
->setDefinition([
40+
new InputArgument('id', InputArgument::REQUIRED, 'Specific message id to remove'),
41+
new InputOption('force', null, InputOption::VALUE_NONE, 'Force the operation without confirmation'),
42+
])
43+
->setDescription('Remove a message from the failure transport.')
44+
->setHelp(<<<'EOF'
45+
The <info>%command.name%</info> removes a message that is pending in the failure transport.
46+
47+
<info>php %command.full_name% {id}</info>
48+
49+
The specific id can be found via the messenger:failed:show command.
50+
EOF
51+
)
52+
;
53+
}
54+
55+
/**
56+
* {@inheritdoc}
57+
*/
58+
protected function execute(InputInterface $input, OutputInterface $output)
59+
{
60+
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
61+
62+
$receiver = $this->getReceiver();
63+
64+
$shouldForce = $input->getOption('force');
65+
$this->removeSingleMessage($input->getArgument('id'), $receiver, $io, $shouldForce);
66+
}
67+
68+
private function removeSingleMessage($id, ReceiverInterface $receiver, SymfonyStyle $io, bool $shouldForce)
69+
{
70+
if (!$receiver instanceof ListableReceiverInterface) {
71+
throw new RuntimeException(sprintf('The "%s" receiver does not support removing specific messages.', $this->getReceiverName()));
72+
}
73+
74+
$envelope = $receiver->find($id);
75+
if (null === $envelope) {
76+
throw new RuntimeException(sprintf('The message with id "%s" was not found.', $id));
77+
}
78+
$this->displaySingleMessage($envelope, $io);
79+
80+
if ($shouldForce || $io->confirm('Do you want to permanently remove this message?', false)) {
81+
$receiver->reject($envelope);
82+
83+
$io->success('Message removed.');
84+
} else {
85+
$io->note('Message not removed.');
86+
}
87+
}
88+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.