Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 0539ca8

Browse filesBrowse files
bug #50787 [Messenger] Fix exiting messenger:failed:retry command (HypeMC)
This PR was merged into the 6.3 branch. Discussion ---------- [Messenger] Fix exiting `messenger:failed:retry` command | Q | A | ------------- | --- | Branch? | 6.3 | Bug fix? | yes | New feature? | no | Deprecations? | no | Tickets | - | License | MIT | Doc PR | - #49539 introduced a bug where it's impossible to exit the `messenger:failed:retry` command: ![Screenshot](https://github.com/symfony/symfony/assets/2445045/6d6d271b-b5f6-4d2f-a150-847ead22083b) `Ctrl+C` doesn't work because the `StopWorkerOnSignalsListener` handles the signal but doesn't actually exit the command, so the only way to currently exit the command is to kill it by force. Commits ------- cd6816b [Messenger] Fix exiting `FailedMessagesRetryCommand`
2 parents 4f085f8 + cd6816b commit 0539ca8
Copy full SHA for 0539ca8

File tree

5 files changed

+72
-9
lines changed
Filter options

5 files changed

+72
-9
lines changed

‎src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
+10Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2098,6 +2098,16 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
20982098
$container->getDefinition('messenger.transport.beanstalkd.factory')->addTag('messenger.transport_factory');
20992099
}
21002100

2101+
if ($config['stop_worker_on_signals'] && $this->hasConsole()) {
2102+
$container->getDefinition('console.command.messenger_consume_messages')
2103+
->replaceArgument(8, $config['stop_worker_on_signals']);
2104+
$container->getDefinition('console.command.messenger_failed_messages_retry')
2105+
->replaceArgument(6, $config['stop_worker_on_signals']);
2106+
}
2107+
2108+
if ($this->hasConsole() && $container->hasDefinition('messenger.listener.stop_worker_signals_listener')) {
2109+
$container->getDefinition('messenger.listener.stop_worker_signals_listener')->clearTag('kernel.event_subscriber');
2110+
}
21012111
if ($config['stop_worker_on_signals']) {
21022112
$container->getDefinition('messenger.listener.stop_worker_signals_listener')->replaceArgument(0, $config['stop_worker_on_signals']);
21032113
}

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/console.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/console.php
+2Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@
163163
service('messenger.listener.reset_services')->nullOnInvalid(),
164164
[], // Bus names
165165
service('messenger.rate_limiter_locator')->nullOnInvalid(),
166+
null,
166167
])
167168
->tag('console.command')
168169
->tag('monolog.logger', ['channel' => 'messenger'])
@@ -194,6 +195,7 @@
194195
service('event_dispatcher'),
195196
service('logger')->nullOnInvalid(),
196197
service('messenger.transport.native_php_serializer')->nullOnInvalid(),
198+
null,
197199
])
198200
->tag('console.command')
199201
->tag('monolog.logger', ['channel' => 'messenger'])

‎src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
+31-4Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use Psr\Log\LoggerInterface;
1616
use Symfony\Component\Console\Attribute\AsCommand;
1717
use Symfony\Component\Console\Command\Command;
18+
use Symfony\Component\Console\Command\SignalableCommandInterface;
1819
use Symfony\Component\Console\Completion\CompletionInput;
1920
use Symfony\Component\Console\Completion\CompletionSuggestions;
2021
use Symfony\Component\Console\Exception\InvalidOptionException;
@@ -39,7 +40,7 @@
3940
* @author Samuel Roze <samuel.roze@gmail.com>
4041
*/
4142
#[AsCommand(name: 'messenger:consume', description: 'Consume messages')]
42-
class ConsumeMessagesCommand extends Command
43+
class ConsumeMessagesCommand extends Command implements SignalableCommandInterface
4344
{
4445
private RoutableMessageBus $routableBus;
4546
private ContainerInterface $receiverLocator;
@@ -49,8 +50,10 @@ class ConsumeMessagesCommand extends Command
4950
private ?ResetServicesListener $resetServicesListener;
5051
private array $busIds;
5152
private ?ContainerInterface $rateLimiterLocator;
53+
private ?array $signals;
54+
private ?Worker $worker = null;
5255

53-
public function __construct(RoutableMessageBus $routableBus, ContainerInterface $receiverLocator, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null, array $receiverNames = [], ResetServicesListener $resetServicesListener = null, array $busIds = [], ContainerInterface $rateLimiterLocator = null)
56+
public function __construct(RoutableMessageBus $routableBus, ContainerInterface $receiverLocator, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null, array $receiverNames = [], ResetServicesListener $resetServicesListener = null, array $busIds = [], ContainerInterface $rateLimiterLocator = null, array $signals = null)
5457
{
5558
$this->routableBus = $routableBus;
5659
$this->receiverLocator = $receiverLocator;
@@ -60,6 +63,7 @@ public function __construct(RoutableMessageBus $routableBus, ContainerInterface
6063
$this->resetServicesListener = $resetServicesListener;
6164
$this->busIds = $busIds;
6265
$this->rateLimiterLocator = $rateLimiterLocator;
66+
$this->signals = $signals;
6367

6468
parent::__construct();
6569
}
@@ -222,14 +226,19 @@ protected function execute(InputInterface $input, OutputInterface $output): int
222226

223227
$bus = $input->getOption('bus') ? $this->routableBus->getMessageBus($input->getOption('bus')) : $this->routableBus;
224228

225-
$worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger, $rateLimiters);
229+
$this->worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger, $rateLimiters);
226230
$options = [
227231
'sleep' => $input->getOption('sleep') * 1000000,
228232
];
229233
if ($queues = $input->getOption('queues')) {
230234
$options['queues'] = $queues;
231235
}
232-
$worker->run($options);
236+
237+
try {
238+
$this->worker->run($options);
239+
} finally {
240+
$this->worker = null;
241+
}
233242

234243
return 0;
235244
}
@@ -247,6 +256,24 @@ public function complete(CompletionInput $input, CompletionSuggestions $suggesti
247256
}
248257
}
249258

259+
public function getSubscribedSignals(): array
260+
{
261+
return $this->signals ?? [\SIGTERM, \SIGINT];
262+
}
263+
264+
public function handleSignal(int $signal, int|false $previousExitCode = 0): int|false
265+
{
266+
if (!$this->worker) {
267+
return false;
268+
}
269+
270+
$this->logger?->info('Received signal {signal}.', ['signal' => $signal, 'transport_names' => $this->worker->getMetadata()->getTransportNames()]);
271+
272+
$this->worker->stop();
273+
274+
return 0;
275+
}
276+
250277
private function convertToBytes(string $memoryLimit): int
251278
{
252279
$memoryLimit = strtolower($memoryLimit);

‎src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php
+27-4Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
use Psr\Log\LoggerInterface;
1515
use Symfony\Component\Console\Attribute\AsCommand;
16+
use Symfony\Component\Console\Command\SignalableCommandInterface;
1617
use Symfony\Component\Console\Exception\RuntimeException;
1718
use Symfony\Component\Console\Input\InputArgument;
1819
use Symfony\Component\Console\Input\InputInterface;
@@ -36,17 +37,20 @@
3637
* @author Ryan Weaver <ryan@symfonycasts.com>
3738
*/
3839
#[AsCommand(name: 'messenger:failed:retry', description: 'Retry one or more messages from the failure transport')]
39-
class FailedMessagesRetryCommand extends AbstractFailedMessagesCommand
40+
class FailedMessagesRetryCommand extends AbstractFailedMessagesCommand implements SignalableCommandInterface
4041
{
4142
private EventDispatcherInterface $eventDispatcher;
4243
private MessageBusInterface $messageBus;
4344
private ?LoggerInterface $logger;
45+
private ?array $signals;
46+
private ?Worker $worker = null;
4447

45-
public function __construct(?string $globalReceiverName, ServiceProviderInterface $failureTransports, MessageBusInterface $messageBus, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null, PhpSerializer $phpSerializer = null)
48+
public function __construct(?string $globalReceiverName, ServiceProviderInterface $failureTransports, MessageBusInterface $messageBus, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null, PhpSerializer $phpSerializer = null, array $signals = null)
4649
{
4750
$this->eventDispatcher = $eventDispatcher;
4851
$this->messageBus = $messageBus;
4952
$this->logger = $logger;
53+
$this->signals = $signals;
5054

5155
parent::__construct($globalReceiverName, $failureTransports, $phpSerializer);
5256
}
@@ -123,6 +127,24 @@ protected function execute(InputInterface $input, OutputInterface $output): int
123127
return 0;
124128
}
125129

130+
public function getSubscribedSignals(): array
131+
{
132+
return $this->signals ?? [\SIGTERM, \SIGINT];
133+
}
134+
135+
public function handleSignal(int $signal, int|false $previousExitCode = 0): int|false
136+
{
137+
if (!$this->worker) {
138+
return false;
139+
}
140+
141+
$this->logger?->info('Received signal {signal}.', ['signal' => $signal, 'transport_names' => $this->worker->getMetadata()->getTransportNames()]);
142+
143+
$this->worker->stop();
144+
145+
return 0;
146+
}
147+
126148
private function runInteractive(string $failureTransportName, SymfonyStyle $io, bool $shouldForce): void
127149
{
128150
$receiver = $this->failureTransports->get($failureTransportName);
@@ -187,16 +209,17 @@ private function runWorker(string $failureTransportName, ReceiverInterface $rece
187209
};
188210
$this->eventDispatcher->addListener(WorkerMessageReceivedEvent::class, $listener);
189211

190-
$worker = new Worker(
212+
$this->worker = new Worker(
191213
[$failureTransportName => $receiver],
192214
$this->messageBus,
193215
$this->eventDispatcher,
194216
$this->logger
195217
);
196218

197219
try {
198-
$worker->run();
220+
$this->worker->run();
199221
} finally {
222+
$this->worker = null;
200223
$this->eventDispatcher->removeListener(WorkerMessageReceivedEvent::class, $listener);
201224
}
202225

‎src/Symfony/Component/Messenger/composer.json

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/composer.json
+2-1Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
},
2323
"require-dev": {
2424
"psr/cache": "^1.0|^2.0|^3.0",
25-
"symfony/console": "^5.4|^6.0",
25+
"symfony/console": "^6.3",
2626
"symfony/dependency-injection": "^5.4|^6.0",
2727
"symfony/deprecation-contracts": "^2.5|^3",
2828
"symfony/event-dispatcher": "^5.4|^6.0",
@@ -37,6 +37,7 @@
3737
"symfony/validator": "^5.4|^6.0"
3838
},
3939
"conflict": {
40+
"symfony/console": "<6.3",
4041
"symfony/event-dispatcher": "<5.4",
4142
"symfony/event-dispatcher-contracts": "<2.5",
4243
"symfony/framework-bundle": "<5.4",

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.