From 9f72d08fff311ddc08487e81a220c1d6c6ce28a6 Mon Sep 17 00:00:00 2001 From: Nyholm Date: Mon, 27 Jan 2020 14:19:08 +0100 Subject: [PATCH] [Messenger] Add middleware to stop all workers on exception --- .../Resources/config/messenger.xml | 7 ++ .../StopWorkerOnExceptionMiddleware.php | 81 +++++++++++++++++++ 2 files changed, 88 insertions(+) create mode 100644 src/Symfony/Component/Messenger/Middleware/StopWorkerOnExceptionMiddleware.php diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml index 14117ee8e40a4..2b2c086346dfd 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml @@ -56,6 +56,13 @@ + + + + + + + diff --git a/src/Symfony/Component/Messenger/Middleware/StopWorkerOnExceptionMiddleware.php b/src/Symfony/Component/Messenger/Middleware/StopWorkerOnExceptionMiddleware.php new file mode 100644 index 0000000000000..e1787fd9e39a6 --- /dev/null +++ b/src/Symfony/Component/Messenger/Middleware/StopWorkerOnExceptionMiddleware.php @@ -0,0 +1,81 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Middleware; + +use Psr\Cache\CacheItemPoolInterface; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\EventListener\StopWorkerOnRestartSignalListener; +use Symfony\Component\Messenger\Exception\HandlerFailedException; + +/** + * Stop all workers when an exceptions is thrown. + * + * @author Tobias Nyholm + */ +class StopWorkerOnExceptionMiddleware +{ + private $exceptions; + private $restartSignalCachePool; + + /** + * + * @param array $exceptions of fully qualified class names of exceptions + */ + public function __construct(array $exceptions) + { + $this->exceptions = array_values($exceptions); + } + + /** + * {@inheritdoc} + */ + public function handle(Envelope $envelope, StackInterface $stack): Envelope + { + try { + return $stack->next()->handle($envelope, $stack); + } catch (HandlerFailedException $e) { + if (count($this->exceptions) === 0) { + throw $e; + } + + if (count($this->exceptions) === 1 && $this->exceptions[0] === '*') { + $this->stopWorkers(); + throw $e; + } + + foreach ($e->getNestedExceptions() as $exception) { + if (in_array(get_class($exception), $this->exceptions)) { + $this->stopWorkers(); + break; + } + } + + throw $e; + } + } + + private function stopWorkers(): void + { + $cacheItem = $this->restartSignalCachePool->getItem(StopWorkerOnRestartSignalListener::RESTART_REQUESTED_TIMESTAMP_KEY); + $cacheItem->set(microtime(true)); + $this->restartSignalCachePool->save($cacheItem); + } + + public function setRestartSignalCachePool(CacheItemPoolInterface $restartSignalCachePool): void + { + if ($this->restartSignalCachePool !== null) { + throw new \RuntimeException('Cannot update restartSignalCachePool dependency'); + } + + $this->restartSignalCachePool = $restartSignalCachePool; + } +}