diff --git a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php index 9345a71b97a10..d820cb7f571d8 100644 --- a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php +++ b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php @@ -250,6 +250,10 @@ protected function execute(InputInterface $input, OutputInterface $output): int $options['queues'] = $queues; } + if (!$this->getApplication()->getAlarmInterval() && $keepAliveInterval = $this->worker->keepaliveInterval()) { + $this->getApplication()->setAlarmInterval($keepAliveInterval); + } + try { $this->worker->run($options); } finally { diff --git a/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php b/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php index 47bcd1463a915..3827a8ef94a02 100644 --- a/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php +++ b/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php @@ -250,10 +250,16 @@ private function runWorker(string $failureTransportName, ReceiverInterface $rece $this->logger ); + $currentKeepaliveInterval = $this->getApplication()->getAlarmInterval(); + if (!$currentKeepaliveInterval && $keepAliveInterval = $this->worker->keepaliveInterval()) { + $this->getApplication()->setAlarmInterval($keepAliveInterval); + } + try { $this->worker->run(); } finally { $this->worker = null; + $this->getApplication()->setAlarmInterval($currentKeepaliveInterval); $this->eventDispatcher->removeListener(WorkerMessageReceivedEvent::class, $listener); } diff --git a/src/Symfony/Component/Messenger/Transport/Receiver/KeepaliveReceiverInterface.php b/src/Symfony/Component/Messenger/Transport/Receiver/KeepaliveReceiverInterface.php index b978b2a5db2c8..9573f081e8f3f 100644 --- a/src/Symfony/Component/Messenger/Transport/Receiver/KeepaliveReceiverInterface.php +++ b/src/Symfony/Component/Messenger/Transport/Receiver/KeepaliveReceiverInterface.php @@ -14,6 +14,9 @@ use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\TransportException; +/** + * @method int|null getKeepaliveInterval() Interval to schedule a SIGALRM signal in seconds. + */ interface KeepaliveReceiverInterface extends ReceiverInterface { /** diff --git a/src/Symfony/Component/Messenger/Worker.php b/src/Symfony/Component/Messenger/Worker.php index 14b30ba5645bf..a2c1c2d292dc1 100644 --- a/src/Symfony/Component/Messenger/Worker.php +++ b/src/Symfony/Component/Messenger/Worker.php @@ -308,6 +308,23 @@ public function keepalive(?int $seconds): void } } + public function keepaliveInterval(): ?int + { + $minSeconds = null; + foreach ($this->receivers as $receiver) { + if (!$receiver instanceof KeepaliveReceiverInterface || !method_exists($receiver, 'getKeepaliveInterval')) { + continue; + } + + $keepaliveInterval = $receiver->getKeepaliveInterval(); + if ($keepaliveInterval && $keepaliveInterval > 0) { + $minSeconds = min($minSeconds ?? $keepaliveInterval, $keepaliveInterval); + } + } + + return $minSeconds; + } + public function getMetadata(): WorkerMetadata { return $this->metadata;