Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit e151d3c

Browse filesBrowse files
committed
[WIP] Adding failed transport support
1 parent 7e2fbe1 commit e151d3c
Copy full SHA for e151d3c

File tree

51 files changed

+1800
-149
lines changed
Filter options

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Dismiss banner

51 files changed

+1800
-149
lines changed

‎src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php
+4Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1196,6 +1196,10 @@ function ($a) {
11961196
->end()
11971197
->end()
11981198
->end()
1199+
->scalarNode('failure_transport')
1200+
->defaultNull()
1201+
->info('Transport name to send failed messages to (after all retries have failed).')
1202+
->end()
11991203
->scalarNode('default_bus')->defaultNull()->end()
12001204
->arrayNode('buses')
12011205
->defaultValue(['messenger.bus.default' => ['default_middleware' => true, 'middleware' => []]])

‎src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
+25-5Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@
4040
use Symfony\Component\Console\Application;
4141
use Symfony\Component\Console\Command\Command;
4242
use Symfony\Component\DependencyInjection\Alias;
43-
use Symfony\Component\DependencyInjection\Argument\IteratorArgument;
4443
use Symfony\Component\DependencyInjection\Argument\ServiceClosureArgument;
4544
use Symfony\Component\DependencyInjection\ChildDefinition;
45+
use Symfony\Component\DependencyInjection\Compiler\ServiceLocatorTagPass;
4646
use Symfony\Component\DependencyInjection\ContainerBuilder;
4747
use Symfony\Component\DependencyInjection\ContainerInterface;
4848
use Symfony\Component\DependencyInjection\Definition;
@@ -1729,22 +1729,42 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
17291729
if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) {
17301730
throw new LogicException(sprintf('Invalid Messenger routing configuration: class or interface "%s" not found.', $message));
17311731
}
1732-
$senders = [];
1732+
1733+
// make sure senderAliases contains all senders
17331734
foreach ($messageConfiguration['senders'] as $sender) {
1734-
$senders[$sender] = new Reference($senderAliases[$sender] ?? $sender);
1735+
if (!isset($senderAliases[$sender])) {
1736+
$senderAliases[$sender] = $sender;
1737+
}
17351738
}
17361739

1737-
$messageToSendersMapping[$message] = new IteratorArgument($senders);
1740+
$messageToSendersMapping[$message] = $messageConfiguration['senders'];
17381741
$messagesToSendAndHandle[$message] = $messageConfiguration['send_and_handle'];
17391742
}
17401743

1744+
$senderReferences = [];
1745+
foreach ($senderAliases as $alias => $serviceId) {
1746+
$senderReferences[$alias] = new Reference($serviceId);
1747+
}
1748+
17411749
$container->getDefinition('messenger.senders_locator')
17421750
->replaceArgument(0, $messageToSendersMapping)
1743-
->replaceArgument(1, $messagesToSendAndHandle)
1751+
->replaceArgument(1, ServiceLocatorTagPass::register($container, $senderReferences))
1752+
->replaceArgument(2, $messagesToSendAndHandle)
17441753
;
17451754

17461755
$container->getDefinition('messenger.retry_strategy_locator')
17471756
->replaceArgument(0, $transportRetryReferences);
1757+
1758+
$container->getDefinition('messenger.failure.send_failed_message_to_failed_transport_listener')
1759+
->replaceArgument(1, $config['failure_transport']);
1760+
1761+
$container->getDefinition('console.command.messenger_failed_messages_retry')
1762+
->replaceArgument(0, $config['failure_transport'])
1763+
->replaceArgument(4, $transportRetryReferences[$config['failure_transport']] ?? null);
1764+
$container->getDefinition('console.command.messenger_failed_messages_show')
1765+
->replaceArgument(0, $config['failure_transport']);
1766+
$container->getDefinition('console.command.messenger_failed_messages_purge')
1767+
->replaceArgument(0, $config['failure_transport']);
17481768
}
17491769

17501770
private function registerCacheConfiguration(array $config, ContainerBuilder $container)

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml
+25Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,31 @@
114114
<tag name="console.command" command="messenger:stop-workers" />
115115
</service>
116116

117+
<service id="console.command.messenger_failed_messages_retry" class="Symfony\Component\Messenger\Command\FailedMessagesRetryCommand">
118+
<argument /> <!-- Receiver name -->
119+
<argument /> <!-- Receiver locator -->
120+
<argument type="service" id="messenger.routable_message_bus" />
121+
<argument type="service" id="event_dispatcher" />
122+
<argument /> <!-- Retry strategy -->
123+
<argument type="service" id="logger" />
124+
125+
<tag name="console.command" command="messenger:failed:retry" />
126+
</service>
127+
128+
<service id="console.command.messenger_failed_messages_show" class="Symfony\Component\Messenger\Command\FailedMessagesShowCommand">
129+
<argument /> <!-- Receiver name -->
130+
<argument /> <!-- Receiver locator -->
131+
132+
<tag name="console.command" command="messenger:failed:show" />
133+
</service>
134+
135+
<service id="console.command.messenger_failed_messages_purge" class="Symfony\Component\Messenger\Command\FailedMessagesPurgeCommand">
136+
<argument /> <!-- Receiver name -->
137+
<argument /> <!-- Receiver locator -->
138+
139+
<tag name="console.command" command="messenger:failed:purge" />
140+
</service>
141+
117142
<service id="console.command.router_debug" class="Symfony\Bundle\FrameworkBundle\Command\RouterDebugCommand">
118143
<argument type="service" id="router" />
119144
<argument type="service" id="debug.file_link_formatter" on-invalid="null" />

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
+16-1Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99

1010
<!-- Asynchronous -->
1111
<service id="messenger.senders_locator" class="Symfony\Component\Messenger\Transport\Sender\SendersLocator">
12-
<argument type="collection" /> <!-- Per message sender iterators -->
12+
<argument type="collection" /> <!-- Per message senders map -->
13+
<argument /> <!-- senders locator -->
1314
<argument type="collection" /> <!-- Messages to send and handle -->
1415
</service>
1516
<service id="messenger.middleware.send_message" class="Symfony\Component\Messenger\Middleware\SendMessageMiddleware">
@@ -87,5 +88,19 @@
8788
<argument /> <!-- multiplier -->
8889
<argument /> <!-- max delay ms -->
8990
</service>
91+
92+
<!-- failed handling -->
93+
<service id="messenger.failure.send_failed_message_to_failed_transport_listener" class="Symfony\Component\Messenger\EventListener\SendFailedMessageToFailedTransportListener">
94+
<tag name="kernel.event_subscriber" />
95+
<tag name="monolog.logger" channel="messenger" />
96+
<argument /> <!-- Message bus locator -->
97+
<argument /> <!-- Failed transport name -->
98+
<argument type="service" id="logger" on-invalid="ignore" />
99+
</service>
100+
101+
<!-- routable event dispatcher -->
102+
<service id="messenger.routable_message_bus" class="Symfony\Component\Messenger\RoutableMessageBus">
103+
<argument /> <!-- Message bus locator -->
104+
</service>
90105
</services>
91106
</container>

‎src/Symfony/Component/Messenger/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/CHANGELOG.md
+4-2Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ CHANGELOG
44
4.3.0
55
-----
66

7+
* [BC BREAK] `SendersLocatorInterface` has an additional method:
8+
`getSenderByAlias()`.
79
* Added optional `MessageCountAwareInterface` that receivers can implement
810
to give information about how many messages are waiting to be processed.
911
* [BC BREAK] The `Envelope::__construct()` signature changed:
@@ -32,8 +34,8 @@ CHANGELOG
3234
to the `Envelope` then find the correct bus when receiving from
3335
the transport. See `ConsumeMessagesCommand`.
3436
* The optional `$busNames` constructor argument of the class `ConsumeMessagesCommand` was removed.
35-
* [BC BREAK] 3 new methods were added to `ReceiverInterface`:
36-
`ack()`, `reject()` and `get()`. The methods `receive()`
37+
* [BC BREAK] 4 new methods were added to `ReceiverInterface`:
38+
`ack()`, `reject()`, `get()` and `purge()`. The methods `receive()`
3739
and `stop()` were removed.
3840
* [BC BREAK] Error handling was moved from the receivers into
3941
`Worker`. Implementations of `ReceiverInterface::handle()`
+99Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Command;
13+
14+
use Symfony\Component\Console\Command\Command;
15+
use Symfony\Component\Console\Style\SymfonyStyle;
16+
use Symfony\Component\Messenger\Envelope;
17+
use Symfony\Component\Messenger\Stamp\ReceiverMessageIdStampInterface;
18+
use Symfony\Component\Messenger\Stamp\SentToFailedTransportStamp;
19+
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
20+
21+
/**
22+
* @author Ryan Weaver <ryan@symfonycasts.com>
23+
*
24+
* @internal
25+
* @experimental in 4.3
26+
*/
27+
abstract class AbstractFailedMessagesCommand extends Command
28+
{
29+
private $receiverName;
30+
private $receiver;
31+
32+
public function __construct(string $receiverName, ReceiverInterface $receiver)
33+
{
34+
$this->receiverName = $receiverName;
35+
$this->receiver = $receiver;
36+
37+
parent::__construct();
38+
}
39+
40+
protected function getReceiverName()
41+
{
42+
return $this->receiverName;
43+
}
44+
45+
protected function getMessageId(Envelope $envelope)
46+
{
47+
foreach ($envelope->all() as $stamps) {
48+
// get the last stamp
49+
$stamp = end($stamps);
50+
if ($stamp instanceof ReceiverMessageIdStampInterface) {
51+
return $stamp->getId();
52+
}
53+
}
54+
}
55+
56+
protected function displaySingleMessage(Envelope $envelope, SymfonyStyle $io)
57+
{
58+
$io->title('Failed Message Details');
59+
60+
/** @var SentToFailedTransportStamp $sentToFailedTransportStamp */
61+
$sentToFailedTransportStamp = $envelope->last(SentToFailedTransportStamp::class);
62+
63+
$rows = [
64+
['Class', \get_class($envelope->getMessage())],
65+
];
66+
67+
if (null !== $id = $this->getMessageId($envelope)) {
68+
$rows[] = ['Message Id', $id];
69+
}
70+
71+
if (null === $sentToFailedTransportStamp) {
72+
$io->warning('Message does not appear to have been sent to this transport after failing');
73+
} else {
74+
$rows = array_merge($rows, [
75+
['Failed at', $sentToFailedTransportStamp->getSentAt()->format('Y-m-d H:i:s')],
76+
['Error', $sentToFailedTransportStamp->getExceptionMessage()],
77+
['Error Class', $sentToFailedTransportStamp->getFlattenException() ? $sentToFailedTransportStamp->getFlattenException()->getClass() : '(unknown)'],
78+
['Transport', $sentToFailedTransportStamp->getOriginalReceiverName()],
79+
]);
80+
}
81+
82+
$io->table([], $rows);
83+
84+
if ($io->isVeryVerbose()) {
85+
if (\function_exists('dump')) {
86+
dump($envelope->getMessage());
87+
}
88+
$io->writeln(' Trace:');
89+
$io->writeln($sentToFailedTransportStamp->getFlattenException()->getTraceAsString());
90+
} else {
91+
$io->writeln(' Re-run command with <info>-vv</info> to see more message & error details.');
92+
}
93+
}
94+
95+
protected function getReceiver(): ReceiverInterface
96+
{
97+
return $this->receiver;
98+
}
99+
}

‎src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
-2Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,8 @@ public function __construct(ContainerInterface $busLocator, ContainerInterface $
5353
{
5454
if (\is_array($retryStrategyLocator)) {
5555
@trigger_error(sprintf('The 5th argument of the class "%s" should be a retry-strategy locator, an array of bus names as a value is deprecated since Symfony 4.3.', __CLASS__), E_USER_DEPRECATED);
56-
5756
$retryStrategyLocator = null;
5857
}
59-
6058
$this->busLocator = $busLocator;
6159
$this->receiverLocator = $receiverLocator;
6260
$this->logger = $logger;
+100Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Command;
13+
14+
use Symfony\Component\Console\Exception\RuntimeException;
15+
use Symfony\Component\Console\Input\InputArgument;
16+
use Symfony\Component\Console\Input\InputInterface;
17+
use Symfony\Component\Console\Input\InputOption;
18+
use Symfony\Component\Console\Output\ConsoleOutputInterface;
19+
use Symfony\Component\Console\Output\OutputInterface;
20+
use Symfony\Component\Console\Style\SymfonyStyle;
21+
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
22+
23+
/**
24+
* @author Ryan Weaver <ryan@symfonycasts.com>
25+
*
26+
* @experimental in 4.3
27+
*/
28+
class FailedMessagesPurgeCommand extends AbstractFailedMessagesCommand
29+
{
30+
protected static $defaultName = 'messenger:failed:purge';
31+
32+
/**
33+
* {@inheritdoc}
34+
*/
35+
protected function configure(): void
36+
{
37+
$this
38+
->setDefinition([
39+
new InputArgument('id', InputArgument::OPTIONAL, 'Specific message id to remove'),
40+
new InputOption('force', null, InputOption::VALUE_NONE, 'Force the operation without confirmation'),
41+
])
42+
->setDescription('Remove one or more messages from the failure transport.')
43+
->setHelp(<<<'EOF'
44+
The <info>%command.name%</info> removes message that are waiting in the failure transport.
45+
46+
<info>php %command.full_name%</info>
47+
EOF
48+
)
49+
;
50+
}
51+
52+
/**
53+
* {@inheritdoc}
54+
*/
55+
protected function execute(InputInterface $input, OutputInterface $output)
56+
{
57+
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
58+
59+
$receiver = $this->getReceiver();
60+
61+
if (!$receiver instanceof ListableReceiverInterface) {
62+
throw new RuntimeException(sprintf('The "%s" receiver does not support deleting one or all messages.', $this->getReceiverName()));
63+
}
64+
65+
$shouldForce = $input->getOption('force');
66+
if (null === $id = $input->getArgument('id')) {
67+
$this->removeAllMessages($receiver, $io, $shouldForce);
68+
} else {
69+
$this->removeSingleMessage($id, $receiver, $io, $shouldForce);
70+
}
71+
}
72+
73+
private function removeSingleMessage($id, ListableReceiverInterface $receiver, SymfonyStyle $io, bool $shouldForce)
74+
{
75+
$envelope = $receiver->find($id);
76+
if (null === $envelope) {
77+
throw new RuntimeException(sprintf('The message with id "%s" was not found.', $id));
78+
}
79+
$this->displaySingleMessage($envelope, $io);
80+
81+
if ($shouldForce || $io->confirm('Do you want to permanently remove this message?', false)) {
82+
$receiver->reject($envelope);
83+
84+
$io->success('Message removed.');
85+
} else {
86+
$io->note('Message not remove.');
87+
}
88+
}
89+
90+
private function removeAllMessages(ListableReceiverInterface $receiver, SymfonyStyle $io, bool $shouldForce)
91+
{
92+
if ($shouldForce || $io->confirm('Do you want to permanently remove ALL failed messages ?', false)) {
93+
$receiver->purge();
94+
95+
$io->success('All messages were removed.');
96+
} else {
97+
$io->note('Messages not remove.');
98+
}
99+
}
100+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.