Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

[Messenger] Add option to stop the worker after a message failed #35453

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions 1 src/Symfony/Component/Messenger/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ CHANGELOG
* Moved Doctrine transport to package `symfony/doctrine-messenger`. All classes in `Symfony\Component\Messenger\Transport\Doctrine` have been moved to `Symfony\Component\Messenger\Bridge\Doctrine\Transport`
* Moved RedisExt transport to package `symfony/redis-messenger`. All classes in `Symfony\Component\Messenger\Transport\RedisExt` have been moved to `Symfony\Component\Messenger\Bridge\Redis\Transport`
* Added support for passing a `\Throwable` argument to `RetryStrategyInterface` methods. This allows to define strategies based on the reason of the handling failure.
* Added `StopWorkerOnFailureLimitListener` to stop the worker after a specified amount of failed messages is reached.

5.0.0
-----
Expand Down
11 changes: 11 additions & 0 deletions 11 src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
use Symfony\Component\Console\Question\ChoiceQuestion;
use Symfony\Component\Console\Style\SymfonyStyle;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\Messenger\EventListener\StopWorkerOnFailureLimitListener;
use Symfony\Component\Messenger\EventListener\StopWorkerOnMemoryLimitListener;
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
use Symfony\Component\Messenger\EventListener\StopWorkerOnTimeLimitListener;
Expand Down Expand Up @@ -64,6 +65,7 @@ protected function configure(): void
->setDefinition([
new InputArgument('receivers', InputArgument::IS_ARRAY, 'Names of the receivers/transports to consume in order of priority', $defaultReceiverName ? [$defaultReceiverName] : []),
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
new InputOption('failure-limit', 'f', InputOption::VALUE_REQUIRED, 'The number of failed messages the worker can consume'),
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'),
new InputOption('sleep', null, InputOption::VALUE_REQUIRED, 'Seconds to sleep before asking for new messages after no messages were found', 1),
Expand All @@ -82,6 +84,10 @@ protected function configure(): void
Use the --limit option to limit the number of messages received:

<info>php %command.full_name% <receiver-name> --limit=10</info>

Use the --failure-limit option to stop the worker when the given number of failed messages is reached:

<info>php %command.full_name% <receiver-name> --failure-limit=2</info>

Use the --memory-limit option to stop the worker if it exceeds a given memory usage limit. You can use shorthand byte values [K, M or G]:

Expand Down Expand Up @@ -152,6 +158,11 @@ protected function execute(InputInterface $input, OutputInterface $output)
$this->eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener($limit, $this->logger));
}

if ($failureLimit = $input->getOption('failure-limit')) {
$stopsWhen[] = "reached {$failureLimit} failed messages";
$this->eventDispatcher->addSubscriber(new StopWorkerOnFailureLimitListener($failureLimit, $this->logger));
}

if ($memoryLimit = $input->getOption('memory-limit')) {
$stopsWhen[] = "exceeded {$memoryLimit} of memory";
$this->eventDispatcher->addSubscriber(new StopWorkerOnMemoryLimitListener($this->convertToBytes($memoryLimit), $this->logger));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\EventListener;

use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;

/**
* @author Michel Hunziker <info@michelhunziker.com>
*/
class StopWorkerOnFailureLimitListener implements EventSubscriberInterface
{
private $maximumNumberOfFailures;
private $logger;
private $failedMessages = 0;

public function __construct(int $maximumNumberOfFailures, LoggerInterface $logger = null)
{
$this->maximumNumberOfFailures = $maximumNumberOfFailures;
$this->logger = $logger;

if ($maximumNumberOfFailures <= 0) {
throw new InvalidArgumentException('Failure limit must be greater than zero.');
}
}

public function onMessageFailed(WorkerMessageFailedEvent $event): void
{
++$this->failedMessages;
}

public function onWorkerRunning(WorkerRunningEvent $event): void
{
if (!$event->isWorkerIdle() && $this->failedMessages >= $this->maximumNumberOfFailures) {
$this->failedMessages = 0;
$event->getWorker()->stop();

if (null !== $this->logger) {
$this->logger->info('Worker stopped due to limit of {count} failed message(s) is reached', ['count' => $this->maximumNumberOfFailures]);
}
}
}

public static function getSubscribedEvents(): array
{
return [
WorkerMessageFailedEvent::class => 'onMessageFailed',
WorkerRunningEvent::class => 'onWorkerRunning',
];
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Tests\EventListener;

use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
use Symfony\Component\Messenger\EventListener\StopWorkerOnFailureLimitListener;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Worker;
use Throwable;

class StopWorkerOnFailureLimitListenerTest extends TestCase
{
/**
* @dataProvider countProvider
*/
public function testWorkerStopsWhenMaximumCountReached(int $max, bool $shouldStop): void
{
$worker = $this->createMock(Worker::class);
$worker->expects($shouldStop ? $this->atLeastOnce() : $this->never())->method('stop');

$failedEvent = $this->createFailedEvent();
$runningEvent = new WorkerRunningEvent($worker, false);

$failureLimitListener = new StopWorkerOnFailureLimitListener($max);
// simulate three messages (of which 2 failed)
$failureLimitListener->onMessageFailed($failedEvent);
$failureLimitListener->onWorkerRunning($runningEvent);

$failureLimitListener->onWorkerRunning($runningEvent);

$failureLimitListener->onMessageFailed($failedEvent);
$failureLimitListener->onWorkerRunning($runningEvent);
}

public function countProvider(): iterable
{
yield [1, true];
yield [2, true];
yield [3, false];
yield [4, false];
}

public function testWorkerLogsMaximumCountReachedWhenLoggerIsGiven(): void
{
$logger = $this->createMock(LoggerInterface::class);
$logger->expects($this->once())->method('info')
->with(
$this->equalTo('Worker stopped due to limit of {count} failed message(s) is reached'),
$this->equalTo(['count' => 1])
);

$worker = $this->createMock(Worker::class);
$event = new WorkerRunningEvent($worker, false);

$failureLimitListener = new StopWorkerOnFailureLimitListener(1, $logger);
$failureLimitListener->onMessageFailed($this->createFailedEvent());
$failureLimitListener->onWorkerRunning($event);
}

private function createFailedEvent(): WorkerMessageFailedEvent
{
$envelope = new Envelope(new DummyMessage('hello'));

return new WorkerMessageFailedEvent($envelope, 'default', $this->createMock(Throwable::class));
}
}
Morty Proxy This is a proxified and sanitized view of the page, visit original site.