Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit c0fc67e

Browse filesBrowse files
committed
Adding a command to stop all worker processes
1 parent f5ebe36 commit c0fc67e
Copy full SHA for c0fc67e

File tree

8 files changed

+279
-0
lines changed
Filter options

8 files changed

+279
-0
lines changed

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/cache.xml

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/cache.xml
+4Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@
3636
<tag name="cache.pool" />
3737
</service>
3838

39+
<service id="cache.messenger" parent="cache.app" public="false">
40+
<tag name="cache.pool" />
41+
</service>
42+
3943
<service id="cache.adapter.system" class="Symfony\Component\Cache\Adapter\AdapterInterface" abstract="true">
4044
<factory class="Symfony\Component\Cache\Adapter\AbstractAdapter" method="createSystemCache" />
4145
<tag name="cache.pool" clearer="cache.system_clearer" />

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml
+8Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,9 @@
8383
<argument type="collection" /> <!-- Receiver names -->
8484
<argument type="service" id="messenger.retry_strategy_locator" />
8585
<argument type="service" id="event_dispatcher" />
86+
<call method="setCachePoolForRestartSignal">
87+
<argument type="service" id="cache.messenger" />
88+
</call>
8689

8790
<tag name="console.command" command="messenger:consume" />
8891
<tag name="console.command" command="messenger:consume-messages" />
@@ -101,6 +104,11 @@
101104
<tag name="console.command" command="debug:messenger" />
102105
</service>
103106

107+
<service id="console.command.messenger_stop_workers" class="Symfony\Component\Messenger\Command\StopWorkersCommand">
108+
<argument type="service" id="cache.messenger" />
109+
<tag name="console.command" command="messenger:stop-workers" />
110+
</service>
111+
104112
<service id="console.command.router_debug" class="Symfony\Bundle\FrameworkBundle\Command\RouterDebugCommand">
105113
<argument type="service" id="router" />
106114
<tag name="console.command" command="debug:router" />

‎src/Symfony/Component/Messenger/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/CHANGELOG.md
+2Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ CHANGELOG
44
4.3.0
55
-----
66

7+
* Added new `messenger:stop-workers` command that sends a signal
8+
to stop all `messenger:consume` workers.
79
* New classes: `RoutableMessageBus`, `AddBusNameStampMiddleware`
810
and `BusNameStamp` were added, which allow you to add a bus identifier
911
to the `Envelope` then find the correct bus when receiving from

‎src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
+14Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
namespace Symfony\Component\Messenger\Command;
1313

14+
use Psr\Cache\CacheItemPoolInterface;
1415
use Psr\Container\ContainerInterface;
1516
use Psr\Log\LoggerInterface;
1617
use Symfony\Component\Console\Command\Command;
@@ -25,6 +26,7 @@
2526
use Symfony\Component\Messenger\Worker;
2627
use Symfony\Component\Messenger\Worker\StopWhenMemoryUsageIsExceededWorker;
2728
use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker;
29+
use Symfony\Component\Messenger\Worker\StopWhenRestartSignalIsReceived;
2830
use Symfony\Component\Messenger\Worker\StopWhenTimeLimitIsReachedWorker;
2931
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
3032

@@ -43,6 +45,8 @@ class ConsumeMessagesCommand extends Command
4345
private $receiverNames;
4446
private $retryStrategyLocator;
4547
private $eventDispatcher;
48+
/** @var CacheItemPoolInterface|null */
49+
private $restartSignalCachePool;
4650

4751
public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], /* ContainerInterface */ $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null)
4852
{
@@ -62,6 +66,11 @@ public function __construct(ContainerInterface $busLocator, ContainerInterface $
6266
parent::__construct();
6367
}
6468

69+
public function setCachePoolForRestartSignal(CacheItemPoolInterface $restartSignalCachePool)
70+
{
71+
$this->restartSignalCachePool = $restartSignalCachePool;
72+
}
73+
6574
/**
6675
* {@inheritdoc}
6776
*/
@@ -190,6 +199,11 @@ protected function execute(InputInterface $input, OutputInterface $output): void
190199
$worker = new StopWhenTimeLimitIsReachedWorker($worker, $timeLimit, $this->logger);
191200
}
192201

202+
if (null !== $this->restartSignalCachePool) {
203+
$stopsWhen[] = 'received a stop signal via the messenger:stop-workers command';
204+
$worker = new StopWhenRestartSignalIsReceived($worker, $this->restartSignalCachePool, $this->logger);
205+
}
206+
193207
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
194208
$io->success(sprintf('Consuming messages from transport%s "%s".', \count($receivers) > 0 ? 's' : '', implode(', ', $receiverNames)));
195209

+73Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Command;
13+
14+
use Psr\Cache\CacheItemPoolInterface;
15+
use Symfony\Component\Console\Command\Command;
16+
use Symfony\Component\Console\Input\InputInterface;
17+
use Symfony\Component\Console\Output\OutputInterface;
18+
use Symfony\Component\Console\Style\SymfonyStyle;
19+
use Symfony\Component\Messenger\Worker\StopWhenRestartSignalIsReceived;
20+
21+
/**
22+
* @author Ryan Weaver <ryan@symfonycasts.com>
23+
*
24+
* @experimental in 4.3
25+
*/
26+
class StopWorkersCommand extends Command
27+
{
28+
protected static $defaultName = 'messenger:stop-workers';
29+
30+
private $restartSignalCachePool;
31+
32+
public function __construct(CacheItemPoolInterface $restartSignalCachePool)
33+
{
34+
$this->restartSignalCachePool = $restartSignalCachePool;
35+
36+
parent::__construct();
37+
}
38+
39+
/**
40+
* {@inheritdoc}
41+
*/
42+
protected function configure(): void
43+
{
44+
$this
45+
->setDefinition([])
46+
->setDescription('Stops workers after their current message')
47+
->setHelp(<<<'EOF'
48+
The <info>%command.name%</info> command sends a signal to stop any <info>messenger:consume</info> processes that are running.
49+
50+
<info>php %command.full_name%</info>
51+
52+
Each worker command will finish the message they are currently processing
53+
and then exit. Worker commands are *not* automatically restarted: that
54+
should be handled by something like supervisord.
55+
EOF
56+
)
57+
;
58+
}
59+
60+
/**
61+
* {@inheritdoc}
62+
*/
63+
protected function execute(InputInterface $input, OutputInterface $output): void
64+
{
65+
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
66+
67+
$cacheItem = $this->restartSignalCachePool->getItem(StopWhenRestartSignalIsReceived::LAST_RESTART_CACHE_KEY);
68+
$cacheItem->set(time());
69+
$this->restartSignalCachePool->save($cacheItem);
70+
71+
$io->success('Signal successfully sent to stop any running workers.');
72+
}
73+
}
+35Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Command;
13+
14+
use PHPUnit\Framework\TestCase;
15+
use Psr\Cache\CacheItemInterface;
16+
use Psr\Cache\CacheItemPoolInterface;
17+
use Symfony\Component\Console\Tester\CommandTester;
18+
use Symfony\Component\Messenger\Command\StopWorkersCommand;
19+
20+
class StopWorkersCommandTest extends TestCase
21+
{
22+
public function testItSetsCacheItem()
23+
{
24+
$cachePool = $this->createMock(CacheItemPoolInterface::class);
25+
$cacheItem = $this->createMock(CacheItemInterface::class);
26+
$cacheItem->expects($this->once())->method('set');
27+
$cachePool->expects($this->once())->method('getItem')->willReturn($cacheItem);
28+
$cachePool->expects($this->once())->method('save')->with($cacheItem);
29+
30+
$command = new StopWorkersCommand($cachePool);
31+
32+
$tester = new CommandTester($command);
33+
$tester->execute([]);
34+
}
35+
}
+71Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Worker;
13+
14+
use PHPUnit\Framework\TestCase;
15+
use Psr\Cache\CacheItemInterface;
16+
use Psr\Cache\CacheItemPoolInterface;
17+
use Symfony\Component\Messenger\Envelope;
18+
use Symfony\Component\Messenger\Tests\Fixtures\DummyWorker;
19+
use Symfony\Component\Messenger\Worker\StopWhenRestartSignalIsReceived;
20+
21+
/**
22+
* @group time-sensitive
23+
*/
24+
class StopWhenRestartSignalIsReceivedTest extends TestCase
25+
{
26+
/**
27+
* @dataProvider restartTimeProvider
28+
*/
29+
public function testWorkerStopsWhenMemoryLimitExceeded(?int $lastRestartTimeOffset, bool $shouldStop)
30+
{
31+
$decoratedWorker = new DummyWorker([
32+
new Envelope(new \stdClass()),
33+
]);
34+
35+
$cachePool = $this->createMock(CacheItemPoolInterface::class);
36+
$cacheItem = $this->createMock(CacheItemInterface::class);
37+
$cacheItem->expects($this->once())->method('isHIt')->willReturn(true);
38+
$cacheItem->expects($this->once())->method('get')->willReturn(null === $lastRestartTimeOffset ? null : time() + $lastRestartTimeOffset);
39+
$cachePool->expects($this->once())->method('getItem')->willReturn($cacheItem);
40+
41+
$stopOnSignalWorker = new StopWhenRestartSignalIsReceived($decoratedWorker, $cachePool);
42+
$stopOnSignalWorker->run();
43+
44+
$this->assertSame($shouldStop, $decoratedWorker->isStopped());
45+
}
46+
47+
public function restartTimeProvider()
48+
{
49+
yield [null, false]; // no cached restart time, do not restart
50+
yield [+10, true]; // 10 seconds after starting, a restart was requested
51+
yield [-10, false]; // a restart was requested, but 10 seconds before we started
52+
}
53+
54+
public function testWorkerDoesNotStopIfRestartNotInCache()
55+
{
56+
$decoratedWorker = new DummyWorker([
57+
new Envelope(new \stdClass()),
58+
]);
59+
60+
$cachePool = $this->createMock(CacheItemPoolInterface::class);
61+
$cacheItem = $this->createMock(CacheItemInterface::class);
62+
$cacheItem->expects($this->once())->method('isHIt')->willReturn(false);
63+
$cacheItem->expects($this->never())->method('get');
64+
$cachePool->expects($this->once())->method('getItem')->willReturn($cacheItem);
65+
66+
$stopOnSignalWorker = new StopWhenRestartSignalIsReceived($decoratedWorker, $cachePool);
67+
$stopOnSignalWorker->run();
68+
69+
$this->assertFalse($decoratedWorker->isStopped());
70+
}
71+
}
+72Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Worker;
13+
14+
use Psr\Cache\CacheItemPoolInterface;
15+
use Psr\Log\LoggerInterface;
16+
use Symfony\Component\Messenger\Envelope;
17+
use Symfony\Component\Messenger\WorkerInterface;
18+
19+
/**
20+
* @author Ryan Weaver <ryan@symfonycasts.com>
21+
*
22+
* @experimental in 4.3
23+
*/
24+
class StopWhenRestartSignalIsReceived implements WorkerInterface
25+
{
26+
public const LAST_RESTART_CACHE_KEY = 'workers.last_restart';
27+
28+
private $decoratedWorker;
29+
private $cachePool;
30+
private $logger;
31+
32+
public function __construct(WorkerInterface $decoratedWorker, CacheItemPoolInterface $cachePool, LoggerInterface $logger = null)
33+
{
34+
$this->decoratedWorker = $decoratedWorker;
35+
$this->cachePool = $cachePool;
36+
$this->logger = $logger;
37+
}
38+
39+
public function run(array $options = [], callable $onHandledCallback = null): void
40+
{
41+
$workerStartedTimestamp = time();
42+
43+
$this->decoratedWorker->run($options, function (?Envelope $envelope) use ($onHandledCallback, $workerStartedTimestamp) {
44+
if (null !== $onHandledCallback) {
45+
$onHandledCallback($envelope);
46+
}
47+
48+
if ($this->shouldRestart($workerStartedTimestamp)) {
49+
$this->stop();
50+
if (null !== $this->logger) {
51+
$this->logger->info('Worker stopped because a restart was requested.');
52+
}
53+
}
54+
});
55+
}
56+
57+
public function stop(): void
58+
{
59+
$this->decoratedWorker->stop();
60+
}
61+
62+
private function shouldRestart(int $workerStartedAt)
63+
{
64+
$cacheItem = $this->cachePool->getItem(self::LAST_RESTART_CACHE_KEY);
65+
if (!$cacheItem->isHit()) {
66+
// no restart has ever been scheduled
67+
return false;
68+
}
69+
70+
return $workerStartedAt < $cacheItem->get();
71+
}
72+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.