Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 86975f5

Browse filesBrowse files
committed
[Messenger] Refactoring failure to FailedMessage & allowing for requeue
1 parent bec45ed commit 86975f5
Copy full SHA for 86975f5

18 files changed

+626
-310
lines changed

‎src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
+25-20Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
use Symfony\Component\Lock\Store\StoreFactory;
7575
use Symfony\Component\Lock\StoreInterface;
7676
use Symfony\Component\Mailer\Mailer;
77+
use Symfony\Component\Messenger\Failure\FailedMessage;
7778
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
7879
use Symfony\Component\Messenger\MessageBus;
7980
use Symfony\Component\Messenger\MessageBusInterface;
@@ -286,9 +287,6 @@ public function load(array $configs, ContainerBuilder $container)
286287
$container->removeDefinition('console.command.messenger_debug');
287288
$container->removeDefinition('console.command.messenger_stop_workers');
288289
$container->removeDefinition('console.command.messenger_setup_transports');
289-
$container->removeDefinition('console.command.messenger_failed_messages_retry');
290-
$container->removeDefinition('console.command.messenger_failed_messages_show');
291-
$container->removeDefinition('console.command.messenger_failed_messages_remove');
292290
}
293291

294292
$propertyInfoEnabled = $this->isConfigEnabled($container, $config['property_info']);
@@ -1745,6 +1743,30 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
17451743

17461744
$messageToSendersMapping = [];
17471745
$messagesToSendAndHandle = [];
1746+
if ($config['failure_transport']) {
1747+
$failureTransport = $config['failure_transport'];
1748+
1749+
$loader->load('messenger_failure.xml');
1750+
$container->getDefinition('console.command.messenger_failed_messages_retry')
1751+
->replaceArgument(0, $failureTransport)
1752+
->replaceArgument(4, $transportRetryReferences[$failureTransport] ?? null);
1753+
$container->getDefinition('console.command.messenger_failed_messages_show')
1754+
->replaceArgument(0, $failureTransport);
1755+
$container->getDefinition('console.command.messenger_failed_messages_remove')
1756+
->replaceArgument(0, $failureTransport);
1757+
1758+
// push routing for FailedMessage to the failure transport
1759+
if (!isset($messageToSendersMapping[FailedMessage::class])) {
1760+
$messageToSendersMapping[FailedMessage::class] = [];
1761+
}
1762+
$messageToSendersMapping[FailedMessage::class][] = $failureTransport;
1763+
1764+
// in case this is a tagged sender, make sure it's in the aliases
1765+
if (!isset($senderAliases[$failureTransport])) {
1766+
$senderAliases[$failureTransport] = $failureTransport;
1767+
}
1768+
}
1769+
17481770
foreach ($config['routing'] as $message => $messageConfiguration) {
17491771
if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) {
17501772
throw new LogicException(sprintf('Invalid Messenger routing configuration: class or interface "%s" not found.', $message));
@@ -1774,23 +1796,6 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
17741796

17751797
$container->getDefinition('messenger.retry_strategy_locator')
17761798
->replaceArgument(0, $transportRetryReferences);
1777-
1778-
if ($config['failure_transport']) {
1779-
$container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')
1780-
->replaceArgument(1, $config['failure_transport']);
1781-
$container->getDefinition('console.command.messenger_failed_messages_retry')
1782-
->replaceArgument(0, $config['failure_transport'])
1783-
->replaceArgument(4, $transportRetryReferences[$config['failure_transport']] ?? null);
1784-
$container->getDefinition('console.command.messenger_failed_messages_show')
1785-
->replaceArgument(0, $config['failure_transport']);
1786-
$container->getDefinition('console.command.messenger_failed_messages_remove')
1787-
->replaceArgument(0, $config['failure_transport']);
1788-
} else {
1789-
$container->removeDefinition('messenger.failure.send_failed_message_to_failure_transport_listener');
1790-
$container->removeDefinition('console.command.messenger_failed_messages_retry');
1791-
$container->removeDefinition('console.command.messenger_failed_messages_show');
1792-
$container->removeDefinition('console.command.messenger_failed_messages_remove');
1793-
}
17941799
}
17951800

17961801
private function registerCacheConfiguration(array $config, ContainerBuilder $container)

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml
-25Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -114,31 +114,6 @@
114114
<tag name="console.command" command="messenger:stop-workers" />
115115
</service>
116116

117-
<service id="console.command.messenger_failed_messages_retry" class="Symfony\Component\Messenger\Command\FailedMessagesRetryCommand">
118-
<argument /> <!-- Receiver name -->
119-
<argument /> <!-- Receiver locator -->
120-
<argument type="service" id="messenger.routable_message_bus" />
121-
<argument type="service" id="event_dispatcher" />
122-
<argument /> <!-- Retry strategy -->
123-
<argument type="service" id="logger" />
124-
125-
<tag name="console.command" command="messenger:failed:retry" />
126-
</service>
127-
128-
<service id="console.command.messenger_failed_messages_show" class="Symfony\Component\Messenger\Command\FailedMessagesShowCommand">
129-
<argument /> <!-- Receiver name -->
130-
<argument /> <!-- Receiver locator -->
131-
132-
<tag name="console.command" command="messenger:failed:show" />
133-
</service>
134-
135-
<service id="console.command.messenger_failed_messages_remove" class="Symfony\Component\Messenger\Command\FailedMessagesRemoveCommand">
136-
<argument /> <!-- Receiver name -->
137-
<argument /> <!-- Receiver locator -->
138-
139-
<tag name="console.command" command="messenger:failed:remove" />
140-
</service>
141-
142117
<service id="console.command.router_debug" class="Symfony\Bundle\FrameworkBundle\Command\RouterDebugCommand">
143118
<argument type="service" id="router" />
144119
<argument type="service" id="debug.file_link_formatter" on-invalid="null" />

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
-9Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -93,15 +93,6 @@
9393
<argument /> <!-- max delay ms -->
9494
</service>
9595

96-
<!-- failed handling -->
97-
<service id="messenger.failure.send_failed_message_to_failure_transport_listener" class="Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener">
98-
<tag name="kernel.event_subscriber" />
99-
<tag name="monolog.logger" channel="messenger" />
100-
<argument type="service" id="messenger.routable_message_bus" />
101-
<argument /> <!-- Failure transport name -->
102-
<argument type="service" id="logger" on-invalid="ignore" />
103-
</service>
104-
10596
<!-- routable message bus -->
10697
<service id="messenger.routable_message_bus" class="Symfony\Component\Messenger\RoutableMessageBus">
10798
<argument /> <!-- Message bus locator -->
+50Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
<?xml version="1.0" ?>
2+
3+
<container xmlns="http://symfony.com/schema/dic/services"
4+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
5+
xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd">
6+
7+
<services>
8+
<defaults public="false" />
9+
10+
<service id="messenger.failure.send_failed_message_to_failure_transport_listener" class="Symfony\Component\Messenger\Failure\SendFailedMessageToFailureTransportListener">
11+
<tag name="kernel.event_subscriber" />
12+
<tag name="monolog.logger" channel="messenger" />
13+
<argument type="service" id="messenger.routable_message_bus" />
14+
<argument /> <!-- Failure transport name -->
15+
<argument type="service" id="logger" on-invalid="ignore" />
16+
</service>
17+
18+
<service id="messenger.failure.failed_message_handler" class="Symfony\Component\Messenger\Failure\FailedMessageHandler">
19+
<tag name="messenger.message_handler" handles="Symfony\Component\Messenger\Failure\FailedMessage" method="__invoke" />
20+
<tag name="monolog.logger" channel="messenger" />
21+
<argument type="service" id="messenger.routable_message_bus" />
22+
<argument type="service" id="logger" on-invalid="ignore" />
23+
</service>
24+
25+
<service id="console.command.messenger_failed_messages_retry" class="Symfony\Component\Messenger\Command\FailedMessagesRetryCommand">
26+
<argument /> <!-- Receiver name -->
27+
<argument /> <!-- Receiver locator -->
28+
<argument type="service" id="messenger.routable_message_bus" />
29+
<argument type="service" id="event_dispatcher" />
30+
<argument /> <!-- Retry strategy -->
31+
<argument type="service" id="logger" />
32+
33+
<tag name="console.command" command="messenger:failed:retry" />
34+
</service>
35+
36+
<service id="console.command.messenger_failed_messages_show" class="Symfony\Component\Messenger\Command\FailedMessagesShowCommand">
37+
<argument /> <!-- Receiver name -->
38+
<argument /> <!-- Receiver locator -->
39+
40+
<tag name="console.command" command="messenger:failed:show" />
41+
</service>
42+
43+
<service id="console.command.messenger_failed_messages_remove" class="Symfony\Component\Messenger\Command\FailedMessagesRemoveCommand">
44+
<argument /> <!-- Receiver name -->
45+
<argument /> <!-- Receiver locator -->
46+
47+
<tag name="console.command" command="messenger:failed:remove" />
48+
</service>
49+
</services>
50+
</container>

‎src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php
+24-15Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
use Symfony\Component\Console\Helper\Dumper;
1616
use Symfony\Component\Console\Style\SymfonyStyle;
1717
use Symfony\Component\Messenger\Envelope;
18-
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
18+
use Symfony\Component\Messenger\Failure\FailedMessage;
19+
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
1920
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
2021
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
2122
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
@@ -59,27 +60,27 @@ protected function displaySingleMessage(Envelope $envelope, SymfonyStyle $io)
5960
{
6061
$io->title('Failed Message Details');
6162

62-
/** @var SentToFailureTransportStamp $sentToFailureTransportStamp */
63-
$sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class);
63+
$message = $envelope->getMessage();
64+
if (!$message instanceof FailedMessage) {
65+
$io->warning('Message does not appear to have been sent to this transport after failing');
66+
67+
return;
68+
}
6469

6570
$rows = [
66-
['Class', \get_class($envelope->getMessage())],
71+
['Class', \get_class($message->getFailedEnvelope()->getMessage())],
6772
];
6873

6974
if (null !== $id = $this->getMessageId($envelope)) {
7075
$rows[] = ['Message Id', $id];
7176
}
7277

73-
if (null === $sentToFailureTransportStamp) {
74-
$io->warning('Message does not appear to have been sent to this transport after failing');
75-
} else {
76-
$rows = array_merge($rows, [
77-
['Failed at', $sentToFailureTransportStamp->getSentAt()->format('Y-m-d H:i:s')],
78-
['Error', $sentToFailureTransportStamp->getExceptionMessage()],
79-
['Error Class', $sentToFailureTransportStamp->getFlattenException() ? $sentToFailureTransportStamp->getFlattenException()->getClass() : '(unknown)'],
80-
['Transport', $sentToFailureTransportStamp->getOriginalReceiverName()],
81-
]);
82-
}
78+
$rows = array_merge($rows, [
79+
['Failed at', $message->getFailedAt()->format('Y-m-d H:i:s')],
80+
['Error', $message->getExceptionMessage()],
81+
['Error Class', $message->getFlattenException() ? $message->getFlattenException()->getClass() : '(unknown)'],
82+
['Transport', $this->getOriginalTransportName($message->getFailedEnvelope())],
83+
]);
8384

8485
$io->table([], $rows);
8586

@@ -88,7 +89,7 @@ protected function displaySingleMessage(Envelope $envelope, SymfonyStyle $io)
8889
$dump = new Dumper($io);
8990
$io->writeln($dump($envelope->getMessage()));
9091
$io->title('Exception:');
91-
$io->writeln($sentToFailureTransportStamp->getFlattenException()->getTraceAsString());
92+
$io->writeln($message->getFlattenException()->getTraceAsString());
9293
} else {
9394
$io->writeln(' Re-run command with <info>-vv</info> to see more message & error details.');
9495
}
@@ -109,4 +110,12 @@ protected function getReceiver(): ReceiverInterface
109110
{
110111
return $this->receiver;
111112
}
113+
114+
private function getOriginalTransportName(Envelope $envelope): ?string
115+
{
116+
/** @var ReceivedStamp $receivedStamp */
117+
$receivedStamp = $envelope->last(ReceivedStamp::class);
118+
119+
return null === $receivedStamp ? null : $receivedStamp->getTransportName();
120+
}
112121
}

‎src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php
+43-16Lines changed: 43 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
use Symfony\Component\Messenger\Envelope;
2424
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
2525
use Symfony\Component\Messenger\Exception\LogicException;
26+
use Symfony\Component\Messenger\Failure\FailedMessage;
2627
use Symfony\Component\Messenger\MessageBusInterface;
2728
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
2829
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
@@ -63,8 +64,9 @@ protected function configure(): void
6364
->setDefinition([
6465
new InputArgument('id', InputArgument::IS_ARRAY, 'Specific message id(s) to retry'),
6566
new InputOption('force', null, InputOption::VALUE_NONE, 'Force action without confirmation'),
67+
new InputOption('resend', null, InputOption::VALUE_NONE, 'Resend messages to the original transport instead of retrying immediately'),
6668
])
67-
->setDescription('Retries one or more messages from the failure transport.')
69+
->setDescription('Retries (or requeues) one or more messages from the failure transport.')
6870
->setHelp(<<<'EOF'
6971
The <info>%command.name%</info> retries message in the failure transport.
7072
@@ -101,25 +103,26 @@ protected function execute(InputInterface $input, OutputInterface $output)
101103
$receiver = $this->getReceiver();
102104
$this->printPendingMessagesMessage($receiver, $io);
103105

104-
$io->writeln(sprintf('To retry all the messages, run <comment>messenger:consume %s</comment>', $this->getReceiverName()));
106+
$io->writeln(sprintf('To requeue all messages, run <comment>messenger:consume %s</comment>', $this->getReceiverName()));
105107

106108
$shouldForce = $input->getOption('force');
109+
$shouldResend = $input->getOption('resend');
107110
$ids = $input->getArgument('id');
108111
if (0 === \count($ids)) {
109112
if (!$input->isInteractive()) {
110113
throw new RuntimeException('Message id must be passed when in non-interactive mode.');
111114
}
112115

113-
$this->runInteractive($io, $shouldForce);
116+
$this->runInteractive($io, $shouldForce, $shouldResend);
114117

115118
return;
116119
}
117120

118-
$this->retrySpecificIds($ids, $io, $shouldForce);
121+
$this->retrySpecificIds($ids, $io, $shouldForce, $shouldResend);
119122
$io->success('All done!');
120123
}
121124

122-
private function runInteractive(SymfonyStyle $io, bool $shouldForce)
125+
private function runInteractive(SymfonyStyle $io, bool $shouldForce, bool $shouldResend)
123126
{
124127
$receiver = $this->getReceiver();
125128
$count = 0;
@@ -146,7 +149,7 @@ private function runInteractive(SymfonyStyle $io, bool $shouldForce)
146149
break;
147150
}
148151

149-
$this->retrySpecificIds($ids, $io, $shouldForce);
152+
$this->retrySpecificIds($ids, $io, $shouldForce, $shouldResend);
150153
}
151154
} else {
152155
// get() and ask messages one-by-one
@@ -159,21 +162,45 @@ private function runInteractive(SymfonyStyle $io, bool $shouldForce)
159162
}
160163
}
161164

162-
private function runWorker(ReceiverInterface $receiver, SymfonyStyle $io, bool $shouldForce): int
165+
private function runWorker(ReceiverInterface $receiver, SymfonyStyle $io, bool $shouldForce, bool $shouldResend): int
163166
{
164-
$listener = function (WorkerMessageReceivedEvent $messageReceivedEvent) use ($io, $receiver, $shouldForce) {
167+
$listener = function (WorkerMessageReceivedEvent $messageReceivedEvent) use ($io, $receiver, $shouldForce, $shouldResend) {
165168
$envelope = $messageReceivedEvent->getEnvelope();
169+
$message = $envelope->getMessage();
170+
if (!$message instanceof FailedMessage) {
171+
$io->error('Message does not appear to be a failed message. Skipping.');
166172

167-
$this->displaySingleMessage($envelope, $io);
168-
169-
$shouldHandle = $shouldForce || $io->confirm('Do you want to retry (yes) or delete this message (no)?');
173+
// don't handle it, but don't reject it either
174+
$messageReceivedEvent->shouldHandle(false);
170175

171-
if ($shouldHandle) {
172176
return;
173177
}
174178

175-
$messageReceivedEvent->shouldHandle(false);
176-
$receiver->reject($envelope);
179+
$this->displaySingleMessage($envelope, $io);
180+
181+
$defaultAction = $shouldResend ? 'resend' : 'retry';
182+
if ($shouldForce) {
183+
$action = $defaultAction;
184+
} else {
185+
$action = $io->choice(
186+
'Do you want to retry, resend or delete this message?',
187+
['retry' => 'Retry now', 'resend' => 'Resend to original transport', 'delete' => 'Delete']
188+
);
189+
}
190+
191+
switch ($action) {
192+
case 'retry':
193+
$message->setToRetryStrategy();
194+
break;
195+
case 'resend':
196+
// do nothing: normal processing will resend
197+
$message->setToResendStrategy();
198+
break;
199+
case 'delete':
200+
$messageReceivedEvent->shouldHandle(false);
201+
$receiver->reject($envelope);
202+
break;
203+
}
177204
};
178205
$this->eventDispatcher->addListener(WorkerMessageReceivedEvent::class, $listener);
179206

@@ -200,7 +227,7 @@ private function runWorker(ReceiverInterface $receiver, SymfonyStyle $io, bool $
200227
return $count;
201228
}
202229

203-
private function retrySpecificIds(array $ids, SymfonyStyle $io, bool $shouldForce)
230+
private function retrySpecificIds(array $ids, SymfonyStyle $io, bool $shouldForce, bool $shouldResend)
204231
{
205232
$receiver = $this->getReceiver();
206233

@@ -215,7 +242,7 @@ private function retrySpecificIds(array $ids, SymfonyStyle $io, bool $shouldForc
215242
}
216243

217244
$singleReceiver = new SingleMessageReceiver($receiver, $envelope);
218-
$this->runWorker($singleReceiver, $io, $shouldForce);
245+
$this->runWorker($singleReceiver, $io, $shouldForce, $shouldResend);
219246
}
220247
}
221248
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.