Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 6b19228

Browse filesBrowse files
committed
[Messenger] Refactoring failure to FailedMessage & allowing for requeue
1 parent bec45ed commit 6b19228
Copy full SHA for 6b19228

28 files changed

+706
-571
lines changed

‎src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
+30-28Lines changed: 30 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@
4040
use Symfony\Component\Console\Application;
4141
use Symfony\Component\Console\Command\Command;
4242
use Symfony\Component\DependencyInjection\Alias;
43+
use Symfony\Component\DependencyInjection\Argument\IteratorArgument;
4344
use Symfony\Component\DependencyInjection\Argument\ServiceClosureArgument;
4445
use Symfony\Component\DependencyInjection\ChildDefinition;
45-
use Symfony\Component\DependencyInjection\Compiler\ServiceLocatorTagPass;
4646
use Symfony\Component\DependencyInjection\ContainerBuilder;
4747
use Symfony\Component\DependencyInjection\ContainerInterface;
4848
use Symfony\Component\DependencyInjection\Definition;
@@ -74,6 +74,7 @@
7474
use Symfony\Component\Lock\Store\StoreFactory;
7575
use Symfony\Component\Lock\StoreInterface;
7676
use Symfony\Component\Mailer\Mailer;
77+
use Symfony\Component\Messenger\Failure\FailedMessage;
7778
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
7879
use Symfony\Component\Messenger\MessageBus;
7980
use Symfony\Component\Messenger\MessageBusInterface;
@@ -286,9 +287,6 @@ public function load(array $configs, ContainerBuilder $container)
286287
$container->removeDefinition('console.command.messenger_debug');
287288
$container->removeDefinition('console.command.messenger_stop_workers');
288289
$container->removeDefinition('console.command.messenger_setup_transports');
289-
$container->removeDefinition('console.command.messenger_failed_messages_retry');
290-
$container->removeDefinition('console.command.messenger_failed_messages_show');
291-
$container->removeDefinition('console.command.messenger_failed_messages_remove');
292290
}
293291

294292
$propertyInfoEnabled = $this->isConfigEnabled($container, $config['property_info']);
@@ -1745,19 +1743,41 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
17451743

17461744
$messageToSendersMapping = [];
17471745
$messagesToSendAndHandle = [];
1746+
if ($config['failure_transport']) {
1747+
$failureTransport = $config['failure_transport'];
1748+
1749+
$loader->load('messenger_failure.xml');
1750+
$container->getDefinition('console.command.messenger_failed_messages_retry')
1751+
->replaceArgument(0, $failureTransport)
1752+
->replaceArgument(4, $transportRetryReferences[$failureTransport] ?? null);
1753+
$container->getDefinition('console.command.messenger_failed_messages_show')
1754+
->replaceArgument(0, $failureTransport);
1755+
$container->getDefinition('console.command.messenger_failed_messages_remove')
1756+
->replaceArgument(0, $failureTransport);
1757+
1758+
// push routing for FailedMessage to the failure transport
1759+
if (!isset($messageToSendersMapping[FailedMessage::class])) {
1760+
$messageToSendersMapping[FailedMessage::class] = [];
1761+
}
1762+
$messageToSendersMapping[FailedMessage::class][] = $failureTransport;
1763+
1764+
// in case this is a tagged sender, make sure it's in the aliases
1765+
if (!isset($senderAliases[$failureTransport])) {
1766+
$senderAliases[$failureTransport] = $failureTransport;
1767+
}
1768+
}
1769+
17481770
foreach ($config['routing'] as $message => $messageConfiguration) {
17491771
if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) {
17501772
throw new LogicException(sprintf('Invalid Messenger routing configuration: class or interface "%s" not found.', $message));
17511773
}
1774+
$senders = [];
17521775

1753-
// make sure senderAliases contains all senders
17541776
foreach ($messageConfiguration['senders'] as $sender) {
1755-
if (!isset($senderAliases[$sender])) {
1756-
$senderAliases[$sender] = $sender;
1757-
}
1777+
$senders[$sender] = new Reference($senderAliases[$sender] ?? $sender);
17581778
}
17591779

1760-
$messageToSendersMapping[$message] = $messageConfiguration['senders'];
1780+
$messageToSendersMapping[$message] = new IteratorArgument($senders);
17611781
$messagesToSendAndHandle[$message] = $messageConfiguration['send_and_handle'];
17621782
}
17631783

@@ -1768,29 +1788,11 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
17681788

17691789
$container->getDefinition('messenger.senders_locator')
17701790
->replaceArgument(0, $messageToSendersMapping)
1771-
->replaceArgument(1, ServiceLocatorTagPass::register($container, $senderReferences))
1772-
->replaceArgument(2, $messagesToSendAndHandle)
1791+
->replaceArgument(1, $messagesToSendAndHandle)
17731792
;
17741793

17751794
$container->getDefinition('messenger.retry_strategy_locator')
17761795
->replaceArgument(0, $transportRetryReferences);
1777-
1778-
if ($config['failure_transport']) {
1779-
$container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')
1780-
->replaceArgument(1, $config['failure_transport']);
1781-
$container->getDefinition('console.command.messenger_failed_messages_retry')
1782-
->replaceArgument(0, $config['failure_transport'])
1783-
->replaceArgument(4, $transportRetryReferences[$config['failure_transport']] ?? null);
1784-
$container->getDefinition('console.command.messenger_failed_messages_show')
1785-
->replaceArgument(0, $config['failure_transport']);
1786-
$container->getDefinition('console.command.messenger_failed_messages_remove')
1787-
->replaceArgument(0, $config['failure_transport']);
1788-
} else {
1789-
$container->removeDefinition('messenger.failure.send_failed_message_to_failure_transport_listener');
1790-
$container->removeDefinition('console.command.messenger_failed_messages_retry');
1791-
$container->removeDefinition('console.command.messenger_failed_messages_show');
1792-
$container->removeDefinition('console.command.messenger_failed_messages_remove');
1793-
}
17941796
}
17951797

17961798
private function registerCacheConfiguration(array $config, ContainerBuilder $container)

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml
-25Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -114,31 +114,6 @@
114114
<tag name="console.command" command="messenger:stop-workers" />
115115
</service>
116116

117-
<service id="console.command.messenger_failed_messages_retry" class="Symfony\Component\Messenger\Command\FailedMessagesRetryCommand">
118-
<argument /> <!-- Receiver name -->
119-
<argument /> <!-- Receiver locator -->
120-
<argument type="service" id="messenger.routable_message_bus" />
121-
<argument type="service" id="event_dispatcher" />
122-
<argument /> <!-- Retry strategy -->
123-
<argument type="service" id="logger" />
124-
125-
<tag name="console.command" command="messenger:failed:retry" />
126-
</service>
127-
128-
<service id="console.command.messenger_failed_messages_show" class="Symfony\Component\Messenger\Command\FailedMessagesShowCommand">
129-
<argument /> <!-- Receiver name -->
130-
<argument /> <!-- Receiver locator -->
131-
132-
<tag name="console.command" command="messenger:failed:show" />
133-
</service>
134-
135-
<service id="console.command.messenger_failed_messages_remove" class="Symfony\Component\Messenger\Command\FailedMessagesRemoveCommand">
136-
<argument /> <!-- Receiver name -->
137-
<argument /> <!-- Receiver locator -->
138-
139-
<tag name="console.command" command="messenger:failed:remove" />
140-
</service>
141-
142117
<service id="console.command.router_debug" class="Symfony\Bundle\FrameworkBundle\Command\RouterDebugCommand">
143118
<argument type="service" id="router" />
144119
<argument type="service" id="debug.file_link_formatter" on-invalid="null" />

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
+1-11Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,7 @@
99

1010
<!-- Asynchronous -->
1111
<service id="messenger.senders_locator" class="Symfony\Component\Messenger\Transport\Sender\SendersLocator">
12-
<argument type="collection" /> <!-- Per message senders map -->
13-
<argument /> <!-- senders locator -->
12+
<argument type="collection" /> <!-- Per message sender iterators -->
1413
<argument type="collection" /> <!-- Messages to send and handle -->
1514
</service>
1615
<service id="messenger.middleware.send_message" class="Symfony\Component\Messenger\Middleware\SendMessageMiddleware">
@@ -93,15 +92,6 @@
9392
<argument /> <!-- max delay ms -->
9493
</service>
9594

96-
<!-- failed handling -->
97-
<service id="messenger.failure.send_failed_message_to_failure_transport_listener" class="Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener">
98-
<tag name="kernel.event_subscriber" />
99-
<tag name="monolog.logger" channel="messenger" />
100-
<argument type="service" id="messenger.routable_message_bus" />
101-
<argument /> <!-- Failure transport name -->
102-
<argument type="service" id="logger" on-invalid="ignore" />
103-
</service>
104-
10595
<!-- routable message bus -->
10696
<service id="messenger.routable_message_bus" class="Symfony\Component\Messenger\RoutableMessageBus">
10797
<argument /> <!-- Message bus locator -->
+50Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
<?xml version="1.0" ?>
2+
3+
<container xmlns="http://symfony.com/schema/dic/services"
4+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
5+
xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd">
6+
7+
<services>
8+
<defaults public="false" />
9+
10+
<service id="messenger.failure.send_failed_message_to_failure_transport_listener" class="Symfony\Component\Messenger\Failure\SendFailedMessageToFailureTransportListener">
11+
<tag name="kernel.event_subscriber" />
12+
<tag name="monolog.logger" channel="messenger" />
13+
<argument type="service" id="messenger.routable_message_bus" />
14+
<argument /> <!-- Failure transport name -->
15+
<argument type="service" id="logger" on-invalid="ignore" />
16+
</service>
17+
18+
<service id="messenger.failure.failed_message_handler" class="Symfony\Component\Messenger\Failure\FailedMessageHandler">
19+
<tag name="messenger.message_handler" handles="Symfony\Component\Messenger\Failure\FailedMessage" method="__invoke" />
20+
<tag name="monolog.logger" channel="messenger" />
21+
<argument type="service" id="messenger.routable_message_bus" />
22+
<argument type="service" id="logger" on-invalid="ignore" />
23+
</service>
24+
25+
<service id="console.command.messenger_failed_messages_retry" class="Symfony\Component\Messenger\Command\FailedMessagesRetryCommand">
26+
<argument /> <!-- Receiver name -->
27+
<argument /> <!-- Receiver locator -->
28+
<argument type="service" id="messenger.routable_message_bus" />
29+
<argument type="service" id="event_dispatcher" />
30+
<argument /> <!-- Retry strategy -->
31+
<argument type="service" id="logger" />
32+
33+
<tag name="console.command" command="messenger:failed:retry" />
34+
</service>
35+
36+
<service id="console.command.messenger_failed_messages_show" class="Symfony\Component\Messenger\Command\FailedMessagesShowCommand">
37+
<argument /> <!-- Receiver name -->
38+
<argument /> <!-- Receiver locator -->
39+
40+
<tag name="console.command" command="messenger:failed:show" />
41+
</service>
42+
43+
<service id="console.command.messenger_failed_messages_remove" class="Symfony\Component\Messenger\Command\FailedMessagesRemoveCommand">
44+
<argument /> <!-- Receiver name -->
45+
<argument /> <!-- Receiver locator -->
46+
47+
<tag name="console.command" command="messenger:failed:remove" />
48+
</service>
49+
</services>
50+
</container>

‎src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php
+4-8Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -721,16 +721,12 @@ public function testMessengerRouting()
721721
'*' => false,
722722
];
723723

724-
$this->assertSame($messageToSendAndHandleMapping, $senderLocatorDefinition->getArgument(2));
724+
$this->assertSame($messageToSendAndHandleMapping, $senderLocatorDefinition->getArgument(1));
725725
$sendersMapping = $senderLocatorDefinition->getArgument(0);
726726
$this->assertEquals([
727-
'amqp',
728-
'audit',
729-
], $sendersMapping[DummyMessage::class]);
730-
$sendersLocator = $container->getDefinition((string) $senderLocatorDefinition->getArgument(1));
731-
$this->assertSame(['amqp', 'audit'], array_keys($sendersLocator->getArgument(0)));
732-
$this->assertEquals(new Reference('messenger.transport.amqp'), $sendersLocator->getArgument(0)['amqp']->getValues()[0]);
733-
$this->assertEquals(new Reference('audit'), $sendersLocator->getArgument(0)['audit']->getValues()[0]);
727+
'amqp' => new Reference('messenger.transport.amqp'),
728+
'audit' => new Reference('audit'),
729+
], $sendersMapping[DummyMessage::class]->getValues());
734730
}
735731

736732
public function testMessengerTransportConfiguration()

‎src/Symfony/Component/Messenger/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/CHANGELOG.md
-2Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ CHANGELOG
44
4.3.0
55
-----
66

7-
* [BC BREAK] `SendersLocatorInterface` has an additional method:
8-
`getSenderByAlias()`.
97
* A new `ListableReceiverInterface` was added, which a receiver
108
can implement (when applicable) to enable listing and fetching
119
individual messages by id (used in the new "Failed Messages" commands).

‎src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php
+24-15Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
use Symfony\Component\Console\Helper\Dumper;
1616
use Symfony\Component\Console\Style\SymfonyStyle;
1717
use Symfony\Component\Messenger\Envelope;
18-
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
18+
use Symfony\Component\Messenger\Failure\FailedMessage;
19+
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
1920
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
2021
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
2122
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
@@ -59,27 +60,27 @@ protected function displaySingleMessage(Envelope $envelope, SymfonyStyle $io)
5960
{
6061
$io->title('Failed Message Details');
6162

62-
/** @var SentToFailureTransportStamp $sentToFailureTransportStamp */
63-
$sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class);
63+
$message = $envelope->getMessage();
64+
if (!$message instanceof FailedMessage) {
65+
$io->warning('Message does not appear to have been sent to this transport after failing');
66+
67+
return;
68+
}
6469

6570
$rows = [
66-
['Class', \get_class($envelope->getMessage())],
71+
['Class', \get_class($message->getFailedEnvelope()->getMessage())],
6772
];
6873

6974
if (null !== $id = $this->getMessageId($envelope)) {
7075
$rows[] = ['Message Id', $id];
7176
}
7277

73-
if (null === $sentToFailureTransportStamp) {
74-
$io->warning('Message does not appear to have been sent to this transport after failing');
75-
} else {
76-
$rows = array_merge($rows, [
77-
['Failed at', $sentToFailureTransportStamp->getSentAt()->format('Y-m-d H:i:s')],
78-
['Error', $sentToFailureTransportStamp->getExceptionMessage()],
79-
['Error Class', $sentToFailureTransportStamp->getFlattenException() ? $sentToFailureTransportStamp->getFlattenException()->getClass() : '(unknown)'],
80-
['Transport', $sentToFailureTransportStamp->getOriginalReceiverName()],
81-
]);
82-
}
78+
$rows = array_merge($rows, [
79+
['Failed at', $message->getFailedAt()->format('Y-m-d H:i:s')],
80+
['Error', $message->getExceptionMessage()],
81+
['Error Class', $message->getFlattenException() ? $message->getFlattenException()->getClass() : '(unknown)'],
82+
['Transport', $this->getOriginalTransportName($message->getFailedEnvelope())],
83+
]);
8384

8485
$io->table([], $rows);
8586

@@ -88,7 +89,7 @@ protected function displaySingleMessage(Envelope $envelope, SymfonyStyle $io)
8889
$dump = new Dumper($io);
8990
$io->writeln($dump($envelope->getMessage()));
9091
$io->title('Exception:');
91-
$io->writeln($sentToFailureTransportStamp->getFlattenException()->getTraceAsString());
92+
$io->writeln($message->getFlattenException()->getTraceAsString());
9293
} else {
9394
$io->writeln(' Re-run command with <info>-vv</info> to see more message & error details.');
9495
}
@@ -109,4 +110,12 @@ protected function getReceiver(): ReceiverInterface
109110
{
110111
return $this->receiver;
111112
}
113+
114+
private function getOriginalTransportName(Envelope $envelope): ?string
115+
{
116+
/** @var ReceivedStamp $receivedStamp */
117+
$receivedStamp = $envelope->last(ReceivedStamp::class);
118+
119+
return null === $receivedStamp ? null : $receivedStamp->getTransportName();
120+
}
112121
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.