Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit d7e0f98

Browse filesBrowse files
committed
[Messenger] extract worker logic to listener and sent messages for retry
and failure directly to transport instead of redispatching on the bus
1 parent cf10c02 commit d7e0f98
Copy full SHA for d7e0f98

32 files changed

+332
-602
lines changed

‎UPGRADE-4.4.md

Copy file name to clipboardExpand all lines: UPGRADE-4.4.md
+5Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,11 @@ Messenger
154154

155155
* Deprecated passing a `ContainerInterface` instance as first argument of the `ConsumeMessagesCommand` constructor,
156156
pass a `RoutableMessageBus` instance instead.
157+
* [BC BREAK] Removed `SendersLocatorInterface::getSenderByAlias` added in 4.3.
158+
* [BC BREAK] Removed `$retryStrategies` argument from `Worker::__construct`.
159+
* [BC BREAK] Removed `$retryStrategyLocator` argument from `ConsumeMessagesCommand::__construct`.
160+
* [BC BREAK] Removed `$senderClassOrAlias` argument from `RedeliveryStamp::__construct`.
161+
* [BC BREAK] Removed `UnknownSenderException`.
157162

158163
Mime
159164
----

‎src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
+13-4Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1810,20 +1810,29 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
18101810
$messageToSendersMapping[$message] = $messageConfiguration['senders'];
18111811
}
18121812

1813+
$sendersServiceLocator = ServiceLocatorTagPass::register($container, $senderReferences);
1814+
18131815
$container->getDefinition('messenger.senders_locator')
18141816
->replaceArgument(0, $messageToSendersMapping)
1815-
->replaceArgument(1, ServiceLocatorTagPass::register($container, $senderReferences))
1817+
->replaceArgument(1, $sendersServiceLocator)
1818+
;
1819+
1820+
$container->getDefinition('messenger.retry.send_failed_message_for_retry_listener')
1821+
->replaceArgument(0, $sendersServiceLocator)
18161822
;
18171823

18181824
$container->getDefinition('messenger.retry_strategy_locator')
18191825
->replaceArgument(0, $transportRetryReferences);
18201826

18211827
if ($config['failure_transport']) {
1828+
if (!isset($senderReferences[$config['failure_transport']])) {
1829+
throw new LogicException(sprintf('Invalid Messenger configuration: the failure transport "%s" is not a valid transport or service id.', $config['failure_transport']));
1830+
}
1831+
18221832
$container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')
1823-
->replaceArgument(1, $config['failure_transport']);
1833+
->replaceArgument(0, $senderReferences[$config['failure_transport']]);
18241834
$container->getDefinition('console.command.messenger_failed_messages_retry')
1825-
->replaceArgument(0, $config['failure_transport'])
1826-
->replaceArgument(4, $transportRetryReferences[$config['failure_transport']] ?? null);
1835+
->replaceArgument(0, $config['failure_transport']);
18271836
$container->getDefinition('console.command.messenger_failed_messages_show')
18281837
->replaceArgument(0, $config['failure_transport']);
18291838
$container->getDefinition('console.command.messenger_failed_messages_remove')

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml
+3-5Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,6 @@
8686
<argument type="service" id="messenger.receiver_locator" />
8787
<argument type="service" id="logger" on-invalid="null" />
8888
<argument type="collection" /> <!-- Receiver names -->
89-
<argument type="service" id="messenger.retry_strategy_locator" />
9089
<argument type="service" id="event_dispatcher" />
9190
<call method="setCachePoolForRestartSignal">
9291
<argument type="service" id="cache.messenger.restart_workers_signal" />
@@ -116,25 +115,24 @@
116115

117116
<service id="console.command.messenger_failed_messages_retry" class="Symfony\Component\Messenger\Command\FailedMessagesRetryCommand">
118117
<argument /> <!-- Receiver name -->
119-
<argument /> <!-- Receiver locator -->
118+
<argument /> <!-- Receiver -->
120119
<argument type="service" id="messenger.routable_message_bus" />
121120
<argument type="service" id="event_dispatcher" />
122-
<argument /> <!-- Retry strategy -->
123121
<argument type="service" id="logger" />
124122

125123
<tag name="console.command" command="messenger:failed:retry" />
126124
</service>
127125

128126
<service id="console.command.messenger_failed_messages_show" class="Symfony\Component\Messenger\Command\FailedMessagesShowCommand">
129127
<argument /> <!-- Receiver name -->
130-
<argument /> <!-- Receiver locator -->
128+
<argument /> <!-- Receiver -->
131129

132130
<tag name="console.command" command="messenger:failed:show" />
133131
</service>
134132

135133
<service id="console.command.messenger_failed_messages_remove" class="Symfony\Component\Messenger\Command\FailedMessagesRemoveCommand">
136134
<argument /> <!-- Receiver name -->
137-
<argument /> <!-- Receiver locator -->
135+
<argument /> <!-- Receiver -->
138136

139137
<tag name="console.command" command="messenger:failed:remove" />
140138
</service>

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
+10-3Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
<!-- Asynchronous -->
1111
<service id="messenger.senders_locator" class="Symfony\Component\Messenger\Transport\Sender\SendersLocator">
1212
<argument type="collection" /> <!-- Per message senders map -->
13-
<argument /> <!-- senders locator -->
13+
<argument /> <!-- senders service locator -->
1414
</service>
1515
<service id="messenger.middleware.send_message" class="Symfony\Component\Messenger\Middleware\SendMessageMiddleware">
1616
<tag name="monolog.logger" channel="messenger" />
@@ -98,12 +98,19 @@
9898
<argument /> <!-- max delay ms -->
9999
</service>
100100

101+
<service id="messenger.retry.send_failed_message_for_retry_listener" class="Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener">
102+
<tag name="kernel.event_subscriber" />
103+
<tag name="monolog.logger" channel="messenger" />
104+
<argument /> <!-- senders service locator -->
105+
<argument type="service" id="messenger.retry_strategy_locator" />
106+
<argument type="service" id="logger" on-invalid="ignore" />
107+
</service>
108+
101109
<!-- failed handling -->
102110
<service id="messenger.failure.send_failed_message_to_failure_transport_listener" class="Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener">
103111
<tag name="kernel.event_subscriber" />
104112
<tag name="monolog.logger" channel="messenger" />
105-
<argument type="service" id="messenger.routable_message_bus" />
106-
<argument /> <!-- Failure transport name -->
113+
<argument /> <!-- Failure transport -->
107114
<argument type="service" id="logger" on-invalid="ignore" />
108115
</service>
109116

‎src/Symfony/Bundle/FrameworkBundle/composer.json

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/composer.json
+2-2Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
"symfony/http-client": "^4.4|^5.0",
4646
"symfony/lock": "^4.4|^5.0",
4747
"symfony/mailer": "^4.4|^5.0",
48-
"symfony/messenger": "^4.3.6|^5.0",
48+
"symfony/messenger": "^4.4|^5.0",
4949
"symfony/mime": "^4.4|^5.0",
5050
"symfony/process": "^3.4|^4.0|^5.0",
5151
"symfony/security-csrf": "^3.4|^4.0|^5.0",
@@ -77,7 +77,7 @@
7777
"symfony/form": "<4.3",
7878
"symfony/lock": "<4.4",
7979
"symfony/mailer": "<4.4",
80-
"symfony/messenger": "<4.3.6",
80+
"symfony/messenger": "<4.4",
8181
"symfony/mime": "<4.4",
8282
"symfony/property-info": "<3.4",
8383
"symfony/security-bundle": "<4.4",

‎src/Symfony/Component/Messenger/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/CHANGELOG.md
+6Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,12 @@ CHANGELOG
1111
* Made all dispatched worker event classes final.
1212
* Added support for `from_transport` attribute on `messenger.message_handler` tag.
1313
* Added support for passing `dbindex` as a query parameter to the redis transport DSN.
14+
* [BC BREAK] Removed `SendersLocatorInterface::getSenderByAlias` added in 4.3.
15+
* [BC BREAK] Removed `$retryStrategies` argument from `Worker::__construct`.
16+
* [BC BREAK] Removed `$retryStrategyLocator` argument from `ConsumeMessagesCommand::__construct`.
17+
* [BC BREAK] Removed `$senderClassOrAlias` argument from `RedeliveryStamp::__construct`.
18+
* [BC BREAK] Removed `UnknownSenderException`.
19+
* The component is not marked as `@experimental` anymore.
1420

1521
4.3.0
1622
-----

‎src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ protected function displaySingleMessage(Envelope $envelope, SymfonyStyle $io)
8989
$redeliveryStamps = $envelope->all(RedeliveryStamp::class);
9090
$io->writeln(' Message history:');
9191
foreach ($redeliveryStamps as $redeliveryStamp) {
92-
$io->writeln(sprintf(' * Message failed and redelivered to the <info>%s</info> transport at <info>%s</info>', $redeliveryStamp->getSenderClassOrAlias(), $redeliveryStamp->getRedeliveredAt()->format('Y-m-d H:i:s')));
92+
$io->writeln(sprintf(' * Message failed at <info>%s</info> and was redelivered', $redeliveryStamp->getRedeliveredAt()->format('Y-m-d H:i:s')));
9393
}
9494
$io->newLine();
9595

‎src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
+5-13Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,14 @@ class ConsumeMessagesCommand extends Command
4242
private $receiverLocator;
4343
private $logger;
4444
private $receiverNames;
45-
private $retryStrategyLocator;
4645
private $eventDispatcher;
4746
/** @var CacheItemPoolInterface|null */
4847
private $restartSignalCachePool;
4948

5049
/**
5150
* @param RoutableMessageBus $routableBus
5251
*/
53-
public function __construct($routableBus, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], /* ContainerInterface */ $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null)
52+
public function __construct($routableBus, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], /* EventDispatcherInterface */ $eventDispatcher = null)
5453
{
5554
if ($routableBus instanceof ContainerInterface) {
5655
@trigger_error(sprintf('Passing a "%s" instance as first argument to "%s()" is deprecated since Symfony 4.4, pass a "%s" instance instead.', ContainerInterface::class, __METHOD__, RoutableMessageBus::class), E_USER_DEPRECATED);
@@ -59,17 +58,16 @@ public function __construct($routableBus, ContainerInterface $receiverLocator, L
5958
throw new \TypeError(sprintf('The first argument must be an instance of "%s".', RoutableMessageBus::class));
6059
}
6160

62-
if (\is_array($retryStrategyLocator)) {
63-
@trigger_error(sprintf('The 5th argument of the class "%s" should be a retry-strategy locator, an array of bus names as a value is deprecated since Symfony 4.3.', __CLASS__), E_USER_DEPRECATED);
61+
if (null !== $eventDispatcher && !$eventDispatcher instanceof EventDispatcherInterface) {
62+
@trigger_error(sprintf('The 5th argument of the class "%s" should be a "%s"', __CLASS__, EventDispatcherInterface::class), E_USER_DEPRECATED);
6463

65-
$retryStrategyLocator = null;
64+
$eventDispatcher = null;
6665
}
6766

6867
$this->routableBus = $routableBus;
6968
$this->receiverLocator = $receiverLocator;
7069
$this->logger = $logger;
7170
$this->receiverNames = $receiverNames;
72-
$this->retryStrategyLocator = $retryStrategyLocator;
7371
$this->eventDispatcher = $eventDispatcher;
7472

7573
parent::__construct();
@@ -166,7 +164,6 @@ protected function execute(InputInterface $input, OutputInterface $output)
166164
}
167165

168166
$receivers = [];
169-
$retryStrategies = [];
170167
foreach ($receiverNames = $input->getArgument('receivers') as $receiverName) {
171168
if (!$this->receiverLocator->has($receiverName)) {
172169
$message = sprintf('The receiver "%s" does not exist.', $receiverName);
@@ -177,17 +174,12 @@ protected function execute(InputInterface $input, OutputInterface $output)
177174
throw new RuntimeException($message);
178175
}
179176

180-
if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) {
181-
throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName));
182-
}
183-
184177
$receivers[$receiverName] = $this->receiverLocator->get($receiverName);
185-
$retryStrategies[$receiverName] = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
186178
}
187179

188180
$bus = $input->getOption('bus') ? $this->routableBus->getMessageBus($input->getOption('bus')) : $this->routableBus;
189181

190-
$worker = new Worker($receivers, $bus, $retryStrategies, $this->eventDispatcher, $this->logger);
182+
$worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger);
191183
$stopsWhen = [];
192184
if ($limit = $input->getOption('limit')) {
193185
$stopsWhen[] = "processed {$limit} messages";

‎src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php
+1-5Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
2525
use Symfony\Component\Messenger\Exception\LogicException;
2626
use Symfony\Component\Messenger\MessageBusInterface;
27-
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
2827
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
2928
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
3029
use Symfony\Component\Messenger\Transport\Receiver\SingleMessageReceiver;
@@ -39,14 +38,12 @@ class FailedMessagesRetryCommand extends AbstractFailedMessagesCommand
3938

4039
private $eventDispatcher;
4140
private $messageBus;
42-
private $retryStrategy;
4341
private $logger;
4442

45-
public function __construct(string $receiverName, ReceiverInterface $receiver, MessageBusInterface $messageBus, EventDispatcherInterface $eventDispatcher, RetryStrategyInterface $retryStrategy = null, LoggerInterface $logger = null)
43+
public function __construct(string $receiverName, ReceiverInterface $receiver, MessageBusInterface $messageBus, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null)
4644
{
4745
$this->eventDispatcher = $eventDispatcher;
4846
$this->messageBus = $messageBus;
49-
$this->retryStrategy = $retryStrategy;
5047
$this->logger = $logger;
5148

5249
parent::__construct($receiverName, $receiver);
@@ -180,7 +177,6 @@ private function runWorker(ReceiverInterface $receiver, SymfonyStyle $io, bool $
180177
$worker = new Worker(
181178
[$this->getReceiverName() => $receiver],
182179
$this->messageBus,
183-
[$this->getReceiverName() => $this->retryStrategy],
184180
$this->eventDispatcher,
185181
$this->logger
186182
);

‎src/Symfony/Component/Messenger/Event/WorkerMessageFailedEvent.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Event/WorkerMessageFailedEvent.php
+7-3Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,11 @@
2121
final class WorkerMessageFailedEvent extends AbstractWorkerMessageEvent
2222
{
2323
private $throwable;
24-
private $willRetry;
24+
private $willRetry = false;
2525

26-
public function __construct(Envelope $envelope, string $receiverName, \Throwable $error, bool $willRetry)
26+
public function __construct(Envelope $envelope, string $receiverName, \Throwable $error)
2727
{
2828
$this->throwable = $error;
29-
$this->willRetry = $willRetry;
3029

3130
parent::__construct($envelope, $receiverName);
3231
}
@@ -40,4 +39,9 @@ public function willRetry(): bool
4039
{
4140
return $this->willRetry;
4241
}
42+
43+
public function setForRetry(): void
44+
{
45+
$this->willRetry = true;
46+
}
4347
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.