Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 8e203fc

Browse filesBrowse files
committed
[WIP] Adding failed transport support
1 parent 7e2fbe1 commit 8e203fc
Copy full SHA for 8e203fc

File tree

53 files changed

+1828
-179
lines changed
Filter options

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Dismiss banner

53 files changed

+1828
-179
lines changed

‎src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php
+4Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1196,6 +1196,10 @@ function ($a) {
11961196
->end()
11971197
->end()
11981198
->end()
1199+
->scalarNode('failure_transport')
1200+
->defaultNull()
1201+
->info('Transport name to send failed messages to (after all retries have failed).')
1202+
->end()
11991203
->scalarNode('default_bus')->defaultNull()->end()
12001204
->arrayNode('buses')
12011205
->defaultValue(['messenger.bus.default' => ['default_middleware' => true, 'middleware' => []]])

‎src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
+24-5Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@
4040
use Symfony\Component\Console\Application;
4141
use Symfony\Component\Console\Command\Command;
4242
use Symfony\Component\DependencyInjection\Alias;
43-
use Symfony\Component\DependencyInjection\Argument\IteratorArgument;
4443
use Symfony\Component\DependencyInjection\Argument\ServiceClosureArgument;
4544
use Symfony\Component\DependencyInjection\ChildDefinition;
45+
use Symfony\Component\DependencyInjection\Compiler\ServiceLocatorTagPass;
4646
use Symfony\Component\DependencyInjection\ContainerBuilder;
4747
use Symfony\Component\DependencyInjection\ContainerInterface;
4848
use Symfony\Component\DependencyInjection\Definition;
@@ -1729,22 +1729,41 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
17291729
if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) {
17301730
throw new LogicException(sprintf('Invalid Messenger routing configuration: class or interface "%s" not found.', $message));
17311731
}
1732-
$senders = [];
1732+
1733+
// make sure senderAliases contains all senders
17331734
foreach ($messageConfiguration['senders'] as $sender) {
1734-
$senders[$sender] = new Reference($senderAliases[$sender] ?? $sender);
1735+
if (!isset($senderAliases[$sender])) {
1736+
$senderAliases[$sender] = $sender;
1737+
}
17351738
}
17361739

1737-
$messageToSendersMapping[$message] = new IteratorArgument($senders);
1740+
$messageToSendersMapping[$message] = $messageConfiguration['senders'];
17381741
$messagesToSendAndHandle[$message] = $messageConfiguration['send_and_handle'];
17391742
}
17401743

1744+
$senderReferences = [];
1745+
foreach ($senderAliases as $alias => $serviceId) {
1746+
$senderReferences[$alias] = new Reference($serviceId);
1747+
}
1748+
17411749
$container->getDefinition('messenger.senders_locator')
17421750
->replaceArgument(0, $messageToSendersMapping)
1743-
->replaceArgument(1, $messagesToSendAndHandle)
1751+
->replaceArgument(1, ServiceLocatorTagPass::register($container, $senderReferences))
1752+
->replaceArgument(2, $messagesToSendAndHandle)
17441753
;
17451754

17461755
$container->getDefinition('messenger.retry_strategy_locator')
17471756
->replaceArgument(0, $transportRetryReferences);
1757+
1758+
$container->getDefinition('messenger.failure.send_failed_message_to_failed_transport_listener')
1759+
->replaceArgument(1, $config['failure_transport']);
1760+
1761+
$container->getDefinition('console.command.messenger_failed_messages_retry')
1762+
->replaceArgument(0, $config['failure_transport']);
1763+
$container->getDefinition('console.command.messenger_failed_messages_show')
1764+
->replaceArgument(0, $config['failure_transport']);
1765+
$container->getDefinition('console.command.messenger_failed_messages_purge')
1766+
->replaceArgument(0, $config['failure_transport']);
17481767
}
17491768

17501769
private function registerCacheConfiguration(array $config, ContainerBuilder $container)

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml
+24-3Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,10 @@
8282
</service>
8383

8484
<service id="console.command.messenger_consume_messages" class="Symfony\Component\Messenger\Command\ConsumeMessagesCommand">
85-
<argument /> <!-- Message bus locator -->
8685
<argument type="service" id="messenger.receiver_locator" />
86+
<argument type="service" id="messenger.worker.worker_factory" />
8787
<argument type="service" id="logger" on-invalid="null" />
8888
<argument type="collection" /> <!-- Receiver names -->
89-
<argument type="service" id="messenger.retry_strategy_locator" />
90-
<argument type="service" id="event_dispatcher" />
9189
<call method="setCachePoolForRestartSignal">
9290
<argument type="service" id="cache.messenger.restart_workers_signal" />
9391
</call>
@@ -114,6 +112,29 @@
114112
<tag name="console.command" command="messenger:stop-workers" />
115113
</service>
116114

115+
<service id="console.command.messenger_failed_messages_retry" class="Symfony\Component\Messenger\Command\FailedMessagesRetryCommand">
116+
<argument /> <!-- Receiver name -->
117+
<argument /> <!-- Receiver locator -->
118+
<argument type="service" id="messenger.worker.worker_factory" />
119+
<argument type="service" id="event_dispatcher" />
120+
121+
<tag name="console.command" command="messenger:failed:retry" />
122+
</service>
123+
124+
<service id="console.command.messenger_failed_messages_show" class="Symfony\Component\Messenger\Command\FailedMessagesShowCommand">
125+
<argument /> <!-- Receiver name -->
126+
<argument /> <!-- Receiver locator -->
127+
128+
<tag name="console.command" command="messenger:failed:show" />
129+
</service>
130+
131+
<service id="console.command.messenger_failed_messages_purge" class="Symfony\Component\Messenger\Command\FailedMessagesPurgeCommand">
132+
<argument /> <!-- Receiver name -->
133+
<argument /> <!-- Receiver locator -->
134+
135+
<tag name="console.command" command="messenger:failed:purge" />
136+
</service>
137+
117138
<service id="console.command.router_debug" class="Symfony\Bundle\FrameworkBundle\Command\RouterDebugCommand">
118139
<argument type="service" id="router" />
119140
<argument type="service" id="debug.file_link_formatter" on-invalid="null" />

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
+25-1Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99

1010
<!-- Asynchronous -->
1111
<service id="messenger.senders_locator" class="Symfony\Component\Messenger\Transport\Sender\SendersLocator">
12-
<argument type="collection" /> <!-- Per message sender iterators -->
12+
<argument type="collection" /> <!-- Per message senders map -->
13+
<argument /> <!-- senders locator -->
1314
<argument type="collection" /> <!-- Messages to send and handle -->
1415
</service>
1516
<service id="messenger.middleware.send_message" class="Symfony\Component\Messenger\Middleware\SendMessageMiddleware">
@@ -87,5 +88,28 @@
8788
<argument /> <!-- multiplier -->
8889
<argument /> <!-- max delay ms -->
8990
</service>
91+
92+
<!-- failed handling -->
93+
<service id="messenger.failure.send_failed_message_to_failed_transport_listener" class="Symfony\Component\Messenger\EventListener\SendFailedMessageToFailedTransportListener">
94+
<tag name="kernel.event_subscriber" />
95+
<tag name="monolog.logger" channel="messenger" />
96+
<argument /> <!-- Message bus locator -->
97+
<argument /> <!-- Failed transport name -->
98+
<argument type="service" id="logger" on-invalid="ignore" />
99+
</service>
100+
101+
<!-- worker -->
102+
<service id="messenger.worker.worker_factory" class="Symfony\Component\Messenger\Worker\WorkerFactory">
103+
<tag name="monolog.logger" channel="messenger" />
104+
<argument /> <!-- Message bus locator -->
105+
<argument type="service" id="messenger.retry_strategy_locator" />
106+
<argument type="service" id="event_dispatcher" />
107+
<argument type="service" id="logger" on-invalid="null" />
108+
</service>
109+
110+
<!-- routable event dispatcher -->
111+
<service id="messenger.routable_message_bus" class="Symfony\Component\Messenger\RoutableMessageBus">
112+
<argument /> <!-- Message bus locator -->
113+
</service>
90114
</services>
91115
</container>

‎src/Symfony/Component/Messenger/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/CHANGELOG.md
+4-2Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ CHANGELOG
44
4.3.0
55
-----
66

7+
* [BC BREAK] `SendersLocatorInterface` has an additional method:
8+
`getSenderByAlias()`.
79
* Added optional `MessageCountAwareInterface` that receivers can implement
810
to give information about how many messages are waiting to be processed.
911
* [BC BREAK] The `Envelope::__construct()` signature changed:
@@ -32,8 +34,8 @@ CHANGELOG
3234
to the `Envelope` then find the correct bus when receiving from
3335
the transport. See `ConsumeMessagesCommand`.
3436
* The optional `$busNames` constructor argument of the class `ConsumeMessagesCommand` was removed.
35-
* [BC BREAK] 3 new methods were added to `ReceiverInterface`:
36-
`ack()`, `reject()` and `get()`. The methods `receive()`
37+
* [BC BREAK] 4 new methods were added to `ReceiverInterface`:
38+
`ack()`, `reject()`, `get()` and `purge()`. The methods `receive()`
3739
and `stop()` were removed.
3840
* [BC BREAK] Error handling was moved from the receivers into
3941
`Worker`. Implementations of `ReceiverInterface::handle()`
+94Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Command;
13+
14+
use Symfony\Component\Console\Command\Command;
15+
use Symfony\Component\Console\Style\SymfonyStyle;
16+
use Symfony\Component\Messenger\Envelope;
17+
use Symfony\Component\Messenger\Stamp\ReceiverMessageIdStampInterface;
18+
use Symfony\Component\Messenger\Stamp\SentToFailedTransportStamp;
19+
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
20+
21+
/**
22+
* @author Ryan Weaver <ryan@symfonycasts.com>
23+
*
24+
* @internal
25+
* @experimental in 4.3
26+
*/
27+
abstract class AbstractFailedMessagesCommand extends Command
28+
{
29+
protected $receiverName;
30+
private $receiver;
31+
32+
public function __construct(string $receiverName, ReceiverInterface $receiver)
33+
{
34+
$this->receiverName = $receiverName;
35+
$this->receiver = $receiver;
36+
37+
parent::__construct();
38+
}
39+
40+
protected function getMessageId(Envelope $envelope)
41+
{
42+
foreach ($envelope->all() as $stamps) {
43+
// get the last stamp
44+
$stamp = end($stamps);
45+
if ($stamp instanceof ReceiverMessageIdStampInterface) {
46+
return $stamp->getId();
47+
}
48+
}
49+
}
50+
51+
protected function displaySingleMessage(Envelope $envelope, SymfonyStyle $io)
52+
{
53+
$io->title('Failed Message Details');
54+
55+
/** @var SentToFailedTransportStamp $sentToFailedTransportStamp */
56+
$sentToFailedTransportStamp = $envelope->last(SentToFailedTransportStamp::class);
57+
58+
$rows = [
59+
['Class', \get_class($envelope->getMessage())],
60+
];
61+
62+
if (null !== $id = $this->getMessageId($envelope)) {
63+
$rows[] = ['Message Id', $id];
64+
}
65+
66+
if (null === $sentToFailedTransportStamp) {
67+
$io->warning('Message does not appear to have been sent to this transport after failing');
68+
} else {
69+
$rows = array_merge($rows, [
70+
['Failed at', $sentToFailedTransportStamp->getSentAt()->format('Y-m-d H:i:s')],
71+
['Error', $sentToFailedTransportStamp->getExceptionMessage()],
72+
['Error Class', $sentToFailedTransportStamp->getFlattenException() ? $sentToFailedTransportStamp->getFlattenException()->getClass() : '(unknown)'],
73+
['Transport', $sentToFailedTransportStamp->getOriginalReceiverName()],
74+
]);
75+
}
76+
77+
$io->table([], $rows);
78+
79+
if ($io->isVeryVerbose()) {
80+
if (\function_exists('dump')) {
81+
dump($envelope->getMessage());
82+
}
83+
$io->writeln(' Trace:');
84+
$io->writeln($sentToFailedTransportStamp->getFlattenException()->getTraceAsString());
85+
} else {
86+
$io->writeln(' Re-run command with <info>-vv</info> to see more message & error details.');
87+
}
88+
}
89+
90+
protected function getReceiver(): ReceiverInterface
91+
{
92+
return $this->receiver;
93+
}
94+
}

‎src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
+5-29Lines changed: 5 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,11 @@
2323
use Symfony\Component\Console\Output\OutputInterface;
2424
use Symfony\Component\Console\Question\ChoiceQuestion;
2525
use Symfony\Component\Console\Style\SymfonyStyle;
26-
use Symfony\Component\Messenger\RoutableMessageBus;
27-
use Symfony\Component\Messenger\Worker;
2826
use Symfony\Component\Messenger\Worker\StopWhenMemoryUsageIsExceededWorker;
2927
use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker;
3028
use Symfony\Component\Messenger\Worker\StopWhenRestartSignalIsReceived;
3129
use Symfony\Component\Messenger\Worker\StopWhenTimeLimitIsReachedWorker;
32-
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
30+
use Symfony\Component\Messenger\Worker\WorkerFactory;
3331

3432
/**
3533
* @author Samuel Roze <samuel.roze@gmail.com>
@@ -40,29 +38,19 @@ class ConsumeMessagesCommand extends Command
4038
{
4139
protected static $defaultName = 'messenger:consume';
4240

43-
private $busLocator;
4441
private $receiverLocator;
42+
private $workerFactory;
4543
private $logger;
4644
private $receiverNames;
47-
private $retryStrategyLocator;
48-
private $eventDispatcher;
4945
/** @var CacheItemPoolInterface|null */
5046
private $restartSignalCachePool;
5147

52-
public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], /* ContainerInterface */ $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null)
48+
public function __construct(ContainerInterface $receiverLocator, WorkerFactory $workerFactory, LoggerInterface $logger = null, array $receiverNames = [])
5349
{
54-
if (\is_array($retryStrategyLocator)) {
55-
@trigger_error(sprintf('The 5th argument of the class "%s" should be a retry-strategy locator, an array of bus names as a value is deprecated since Symfony 4.3.', __CLASS__), E_USER_DEPRECATED);
56-
57-
$retryStrategyLocator = null;
58-
}
59-
60-
$this->busLocator = $busLocator;
6150
$this->receiverLocator = $receiverLocator;
51+
$this->workerFactory = $workerFactory;
6252
$this->logger = $logger;
6353
$this->receiverNames = $receiverNames;
64-
$this->retryStrategyLocator = $retryStrategyLocator;
65-
$this->eventDispatcher = $eventDispatcher;
6654

6755
parent::__construct();
6856
}
@@ -158,7 +146,6 @@ protected function execute(InputInterface $input, OutputInterface $output)
158146
}
159147

160148
$receivers = [];
161-
$retryStrategies = [];
162149
foreach ($receiverNames = $input->getArgument('receivers') as $receiverName) {
163150
if (!$this->receiverLocator->has($receiverName)) {
164151
$message = sprintf('The receiver "%s" does not exist.', $receiverName);
@@ -169,21 +156,10 @@ protected function execute(InputInterface $input, OutputInterface $output)
169156
throw new RuntimeException($message);
170157
}
171158

172-
if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) {
173-
throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName));
174-
}
175-
176159
$receivers[$receiverName] = $this->receiverLocator->get($receiverName);
177-
$retryStrategies[$receiverName] = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
178-
}
179-
180-
if (null !== $input->getOption('bus')) {
181-
$bus = $this->busLocator->get($input->getOption('bus'));
182-
} else {
183-
$bus = new RoutableMessageBus($this->busLocator);
184160
}
185161

186-
$worker = new Worker($receivers, $bus, $retryStrategies, $this->eventDispatcher, $this->logger);
162+
$worker = $this->workerFactory->createWorker($receivers, $input->getOption('bus'));
187163
$stopsWhen = [];
188164
if ($limit = $input->getOption('limit')) {
189165
$stopsWhen[] = "processed {$limit} messages";

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.