Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 54c97a4

Browse filesBrowse files
committed
[Messenger] Fix exiting FailedMessagesRetryCommand
1 parent e0b8655 commit 54c97a4
Copy full SHA for 54c97a4

File tree

5 files changed

+69
-10
lines changed
Filter options

5 files changed

+69
-10
lines changed

‎src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
+11Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2098,6 +2098,17 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
20982098
$container->getDefinition('messenger.transport.beanstalkd.factory')->addTag('messenger.transport_factory');
20992099
}
21002100

2101+
if ($this->hasConsole()) {
2102+
$container->getDefinition('messenger.listener.stop_worker_signals_listener')->clearTag('kernel.event_subscriber');
2103+
2104+
if ($config['stop_worker_on_signals']) {
2105+
$container->getDefinition('console.command.messenger_consume_messages')
2106+
->replaceArgument(8, $config['stop_worker_on_signals']);
2107+
$container->getDefinition('console.command.messenger_failed_messages_retry')
2108+
->replaceArgument(6, $config['stop_worker_on_signals']);
2109+
}
2110+
}
2111+
21012112
if ($config['stop_worker_on_signals']) {
21022113
$container->getDefinition('messenger.listener.stop_worker_signals_listener')->replaceArgument(0, $config['stop_worker_on_signals']);
21032114
}

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/console.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/console.php
+4-1Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@
163163
service('messenger.listener.reset_services')->nullOnInvalid(),
164164
[], // Bus names
165165
service('messenger.rate_limiter_locator')->nullOnInvalid(),
166+
null,
166167
])
167168
->tag('console.command')
168169
->tag('monolog.logger', ['channel' => 'messenger'])
@@ -192,10 +193,12 @@
192193
abstract_arg('Receivers'),
193194
service('messenger.routable_message_bus'),
194195
service('event_dispatcher'),
195-
service('logger'),
196+
service('logger')->nullOnInvalid(),
196197
service('messenger.transport.native_php_serializer')->nullOnInvalid(),
198+
null,
197199
])
198200
->tag('console.command')
201+
->tag('monolog.logger', ['channel' => 'messenger'])
199202

200203
->set('console.command.messenger_failed_messages_show', FailedMessagesShowCommand::class)
201204
->args([

‎src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
+26-4Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use Psr\Log\LoggerInterface;
1616
use Symfony\Component\Console\Attribute\AsCommand;
1717
use Symfony\Component\Console\Command\Command;
18+
use Symfony\Component\Console\Command\SignalableCommandInterface;
1819
use Symfony\Component\Console\Completion\CompletionInput;
1920
use Symfony\Component\Console\Completion\CompletionSuggestions;
2021
use Symfony\Component\Console\Exception\InvalidOptionException;
@@ -39,7 +40,7 @@
3940
* @author Samuel Roze <samuel.roze@gmail.com>
4041
*/
4142
#[AsCommand(name: 'messenger:consume', description: 'Consume messages')]
42-
class ConsumeMessagesCommand extends Command
43+
class ConsumeMessagesCommand extends Command implements SignalableCommandInterface
4344
{
4445
private RoutableMessageBus $routableBus;
4546
private ContainerInterface $receiverLocator;
@@ -49,8 +50,10 @@ class ConsumeMessagesCommand extends Command
4950
private ?ResetServicesListener $resetServicesListener;
5051
private array $busIds;
5152
private ?ContainerInterface $rateLimiterLocator;
53+
private ?array $signals;
54+
private Worker $worker;
5255

53-
public function __construct(RoutableMessageBus $routableBus, ContainerInterface $receiverLocator, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null, array $receiverNames = [], ResetServicesListener $resetServicesListener = null, array $busIds = [], ContainerInterface $rateLimiterLocator = null)
56+
public function __construct(RoutableMessageBus $routableBus, ContainerInterface $receiverLocator, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null, array $receiverNames = [], ResetServicesListener $resetServicesListener = null, array $busIds = [], ContainerInterface $rateLimiterLocator = null, array $signals = null)
5457
{
5558
$this->routableBus = $routableBus;
5659
$this->receiverLocator = $receiverLocator;
@@ -60,6 +63,7 @@ public function __construct(RoutableMessageBus $routableBus, ContainerInterface
6063
$this->resetServicesListener = $resetServicesListener;
6164
$this->busIds = $busIds;
6265
$this->rateLimiterLocator = $rateLimiterLocator;
66+
$this->signals = $signals;
6367

6468
parent::__construct();
6569
}
@@ -222,14 +226,14 @@ protected function execute(InputInterface $input, OutputInterface $output): int
222226

223227
$bus = $input->getOption('bus') ? $this->routableBus->getMessageBus($input->getOption('bus')) : $this->routableBus;
224228

225-
$worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger, $rateLimiters);
229+
$this->worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger, $rateLimiters);
226230
$options = [
227231
'sleep' => $input->getOption('sleep') * 1000000,
228232
];
229233
if ($queues = $input->getOption('queues')) {
230234
$options['queues'] = $queues;
231235
}
232-
$worker->run($options);
236+
$this->worker->run($options);
233237

234238
return 0;
235239
}
@@ -247,6 +251,24 @@ public function complete(CompletionInput $input, CompletionSuggestions $suggesti
247251
}
248252
}
249253

254+
public function getSubscribedSignals(): array
255+
{
256+
return $this->signals ?? [\SIGTERM, \SIGINT];
257+
}
258+
259+
public function handleSignal(int $signal, int|false $previousExitCode = 0): int|false
260+
{
261+
if (!isset($this->worker)) {
262+
return false;
263+
}
264+
265+
$this->logger?->info('Received signal {signal}.', ['signal' => $signal, 'transport_names' => $this->worker->getMetadata()->getTransportNames()]);
266+
267+
$this->worker->stop();
268+
269+
return 0;
270+
}
271+
250272
private function convertToBytes(string $memoryLimit): int
251273
{
252274
$memoryLimit = strtolower($memoryLimit);

‎src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php
+26-4Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
use Psr\Log\LoggerInterface;
1515
use Symfony\Component\Console\Attribute\AsCommand;
16+
use Symfony\Component\Console\Command\SignalableCommandInterface;
1617
use Symfony\Component\Console\Exception\RuntimeException;
1718
use Symfony\Component\Console\Input\InputArgument;
1819
use Symfony\Component\Console\Input\InputInterface;
@@ -36,17 +37,20 @@
3637
* @author Ryan Weaver <ryan@symfonycasts.com>
3738
*/
3839
#[AsCommand(name: 'messenger:failed:retry', description: 'Retry one or more messages from the failure transport')]
39-
class FailedMessagesRetryCommand extends AbstractFailedMessagesCommand
40+
class FailedMessagesRetryCommand extends AbstractFailedMessagesCommand implements SignalableCommandInterface
4041
{
4142
private EventDispatcherInterface $eventDispatcher;
4243
private MessageBusInterface $messageBus;
4344
private ?LoggerInterface $logger;
45+
private ?array $signals;
46+
private Worker $worker;
4447

45-
public function __construct(?string $globalReceiverName, ServiceProviderInterface $failureTransports, MessageBusInterface $messageBus, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null, PhpSerializer $phpSerializer = null)
48+
public function __construct(?string $globalReceiverName, ServiceProviderInterface $failureTransports, MessageBusInterface $messageBus, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null, PhpSerializer $phpSerializer = null, array $signals = null)
4649
{
4750
$this->eventDispatcher = $eventDispatcher;
4851
$this->messageBus = $messageBus;
4952
$this->logger = $logger;
53+
$this->signals = $signals;
5054

5155
parent::__construct($globalReceiverName, $failureTransports, $phpSerializer);
5256
}
@@ -123,6 +127,24 @@ protected function execute(InputInterface $input, OutputInterface $output): int
123127
return 0;
124128
}
125129

130+
public function getSubscribedSignals(): array
131+
{
132+
return $this->signals ?? [\SIGTERM, \SIGINT];
133+
}
134+
135+
public function handleSignal(int $signal, int|false $previousExitCode = 0): int|false
136+
{
137+
if (!isset($this->worker)) {
138+
return false;
139+
}
140+
141+
$this->logger?->info('Received signal {signal}.', ['signal' => $signal, 'transport_names' => $this->worker->getMetadata()->getTransportNames()]);
142+
143+
$this->worker->stop();
144+
145+
return 0;
146+
}
147+
126148
private function runInteractive(string $failureTransportName, SymfonyStyle $io, bool $shouldForce): void
127149
{
128150
$receiver = $this->failureTransports->get($failureTransportName);
@@ -187,15 +209,15 @@ private function runWorker(string $failureTransportName, ReceiverInterface $rece
187209
};
188210
$this->eventDispatcher->addListener(WorkerMessageReceivedEvent::class, $listener);
189211

190-
$worker = new Worker(
212+
$this->worker = new Worker(
191213
[$failureTransportName => $receiver],
192214
$this->messageBus,
193215
$this->eventDispatcher,
194216
$this->logger
195217
);
196218

197219
try {
198-
$worker->run();
220+
$this->worker->run();
199221
} finally {
200222
$this->eventDispatcher->removeListener(WorkerMessageReceivedEvent::class, $listener);
201223
}

‎src/Symfony/Component/Messenger/composer.json

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/composer.json
+2-1Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
},
2323
"require-dev": {
2424
"psr/cache": "^1.0|^2.0|^3.0",
25-
"symfony/console": "^5.4|^6.0",
25+
"symfony/console": "^6.3",
2626
"symfony/dependency-injection": "^5.4|^6.0",
2727
"symfony/event-dispatcher": "^5.4|^6.0",
2828
"symfony/http-kernel": "^5.4|^6.0",
@@ -36,6 +36,7 @@
3636
"symfony/validator": "^5.4|^6.0"
3737
},
3838
"conflict": {
39+
"symfony/console": "<6.3",
3940
"symfony/event-dispatcher": "<5.4",
4041
"symfony/event-dispatcher-contracts": "<2.5",
4142
"symfony/framework-bundle": "<5.4",

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.