diff --git a/.github/composer-config.json b/.github/composer-config.json index 01c998e5ed672..1b82f7c5db002 100644 --- a/.github/composer-config.json +++ b/.github/composer-config.json @@ -4,6 +4,7 @@ "preferred-install": { "symfony/form": "source", "symfony/http-kernel": "source", + "symfony/messenger": "source", "symfony/notifier": "source", "symfony/validator": "source", "*": "dist" diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md b/src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md index 6364525877b0b..34bb9547ea9d3 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md @@ -4,7 +4,8 @@ CHANGELOG 5.3 --- -* Deprecated the `prefetch_count` parameter, it has no effect and will be removed in Symfony 6.0. + * Deprecated the `prefetch_count` parameter, it has no effect and will be removed in Symfony 6.0. + * `AmqpReceiver` implements `QueueReceiverInterface` to fetch messages from a specific set of queues. 5.2.0 ----- diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php index 009e7be8d55bb..84630fb28e03f 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php @@ -16,7 +16,7 @@ use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; use Symfony\Component\Messenger\Exception\TransportException; use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface; -use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; +use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; @@ -25,7 +25,7 @@ * * @author Samuel Roze */ -class AmqpReceiver implements ReceiverInterface, MessageCountAwareInterface +class AmqpReceiver implements QueueReceiverInterface, MessageCountAwareInterface { private $serializer; private $connection; @@ -41,7 +41,15 @@ public function __construct(Connection $connection, SerializerInterface $seriali */ public function get(): iterable { - foreach ($this->connection->getQueueNames() as $queueName) { + yield from $this->getFromQueues($this->connection->getQueueNames()); + } + + /** + * {@inheritdoc} + */ + public function getFromQueues(array $queueNames): iterable + { + foreach ($queueNames as $queueName) { yield from $this->getEnvelope($queueName); } } diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/composer.json b/src/Symfony/Component/Messenger/Bridge/Amqp/composer.json index ee21fa3097d3a..b5a4f04132187 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/composer.json +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/composer.json @@ -18,7 +18,7 @@ "require": { "php": ">=7.2.5", "symfony/deprecation-contracts": "^2.1", - "symfony/messenger": "^5.1" + "symfony/messenger": "^5.3" }, "require-dev": { "symfony/event-dispatcher": "^4.4|^5.0", diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index 6df405b68eb57..4cdf02d8d254c 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -5,6 +5,7 @@ CHANGELOG --- * `InMemoryTransport` can perform message serialization through dsn `in-memory://?serialize=true`. + * Added `queues` option to `Worker` to only fetch messages from a specific queue from a receiver implementing `QueueReceiverInterface`. 5.2.0 ----- diff --git a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php index b289a29c0ef40..b2a4faef1f410 100644 --- a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php +++ b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php @@ -71,6 +71,7 @@ protected function configure(): void new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can handle new messages'), new InputOption('sleep', null, InputOption::VALUE_REQUIRED, 'Seconds to sleep before asking for new messages after no messages were found', 1), new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically)'), + new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'), ]) ->setDescription(self::$defaultDescription) ->setHelp(<<<'EOF' @@ -104,6 +105,10 @@ protected function configure(): void messages didn't originate from Messenger: php %command.full_name% --bus=event_bus + +Use the --queues option to limit a receiver to only certain queues (only supported by some receivers): + + php %command.full_name% --queues=fasttrack EOF ) ; @@ -195,9 +200,13 @@ protected function execute(InputInterface $input, OutputInterface $output) $bus = $input->getOption('bus') ? $this->routableBus->getMessageBus($input->getOption('bus')) : $this->routableBus; $worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger); - $worker->run([ + $options = [ 'sleep' => $input->getOption('sleep') * 1000000, - ]); + ]; + if ($queues = $input->getOption('queues')) { + $options['queues'] = $queues; + } + $worker->run($options); return 0; } diff --git a/src/Symfony/Component/Messenger/Tests/WorkerTest.php b/src/Symfony/Component/Messenger/Tests/WorkerTest.php index c105cdad5348c..e47974610b997 100644 --- a/src/Symfony/Component/Messenger/Tests/WorkerTest.php +++ b/src/Symfony/Component/Messenger/Tests/WorkerTest.php @@ -21,12 +21,14 @@ use Symfony\Component\Messenger\Event\WorkerStartedEvent; use Symfony\Component\Messenger\Event\WorkerStoppedEvent; use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener; +use Symfony\Component\Messenger\Exception\RuntimeException; use Symfony\Component\Messenger\MessageBusInterface; use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp; use Symfony\Component\Messenger\Stamp\ReceivedStamp; use Symfony\Component\Messenger\Stamp\SentStamp; use Symfony\Component\Messenger\Stamp\StampInterface; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Component\Messenger\Worker; use Symfony\Contracts\EventDispatcher\EventDispatcherInterface; @@ -245,6 +247,41 @@ public function testWorkerWithMultipleReceivers() $this->assertSame([$envelope1, $envelope2, $envelope3, $envelope4, $envelope5, $envelope6], $processedEnvelopes); } + public function testWorkerLimitQueues() + { + $envelope = [new Envelope(new DummyMessage('message1'))]; + $receiver = $this->createMock(QueueReceiverInterface::class); + $receiver->expects($this->once()) + ->method('getFromQueues') + ->with(['foo']) + ->willReturn($envelope) + ; + $receiver->expects($this->never()) + ->method('get') + ; + + $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); + + $dispatcher = new EventDispatcher(); + $dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1)); + + $worker = new Worker(['transport' => $receiver], $bus, $dispatcher); + $worker->run(['queues' => ['foo']]); + } + + public function testWorkerLimitQueuesUnsupported() + { + $receiver1 = $this->createMock(QueueReceiverInterface::class); + $receiver2 = $this->createMock(ReceiverInterface::class); + + $bus = $this->getMockBuilder(MessageBusInterface::class)->getMock(); + + $worker = new Worker(['transport1' => $receiver1, 'transport2' => $receiver2], $bus); + $this->expectException(RuntimeException::class); + $this->expectExceptionMessage(sprintf('Receiver for "transport2" does not implement "%s".', QueueReceiverInterface::class)); + $worker->run(['queues' => ['foo']]); + } + public function testWorkerMessageReceivedEventMutability() { $envelope = new Envelope(new DummyMessage('Hello')); diff --git a/src/Symfony/Component/Messenger/Transport/Receiver/QueueReceiverInterface.php b/src/Symfony/Component/Messenger/Transport/Receiver/QueueReceiverInterface.php new file mode 100644 index 0000000000000..0248ac621c453 --- /dev/null +++ b/src/Symfony/Component/Messenger/Transport/Receiver/QueueReceiverInterface.php @@ -0,0 +1,33 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Transport\Receiver; + +use Symfony\Component\Messenger\Envelope; + +/** + * Some transports may have multiple queues. This interface is used to read from only some queues. + * + * @author David Buchmann + * + * @experimental in 5.3 + */ +interface QueueReceiverInterface extends ReceiverInterface +{ + /** + * Get messages from the specified queue names instead of consuming from all queues. + * + * @param string[] $queueNames + * + * @return Envelope[] + */ + public function getFromQueues(array $queueNames): iterable; +} diff --git a/src/Symfony/Component/Messenger/Worker.php b/src/Symfony/Component/Messenger/Worker.php index 6f8d34b1cb2cf..f13edcc2f5a05 100644 --- a/src/Symfony/Component/Messenger/Worker.php +++ b/src/Symfony/Component/Messenger/Worker.php @@ -22,8 +22,10 @@ use Symfony\Component\Messenger\Event\WorkerStoppedEvent; use Symfony\Component\Messenger\Exception\HandlerFailedException; use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException; +use Symfony\Component\Messenger\Exception\RuntimeException; use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp; use Symfony\Component\Messenger\Stamp\ReceivedStamp; +use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Contracts\EventDispatcher\EventDispatcherInterface; @@ -57,6 +59,7 @@ public function __construct(array $receivers, MessageBusInterface $bus, EventDis * * Valid options are: * * sleep (default: 1000000): Time in microseconds to sleep after no messages are found + * * queues: The queue names to consume from, instead of consuming from all queues. When this is used, all receivers must implement the QueueReceiverInterface */ public function run(array $options = []): void { @@ -65,11 +68,25 @@ public function run(array $options = []): void $options = array_merge([ 'sleep' => 1000000, ], $options); + $queueNames = $options['queues'] ?? false; + + if ($queueNames) { + // if queue names are specified, all receivers must implement the QueueReceiverInterface + foreach ($this->receivers as $transportName => $receiver) { + if (!$receiver instanceof QueueReceiverInterface) { + throw new RuntimeException(sprintf('Receiver for "%s" does not implement "%s".', $transportName, QueueReceiverInterface::class)); + } + } + } while (false === $this->shouldStop) { $envelopeHandled = false; foreach ($this->receivers as $transportName => $receiver) { - $envelopes = $receiver->get(); + if ($queueNames) { + $envelopes = $receiver->getFromQueues($queueNames); + } else { + $envelopes = $receiver->get(); + } foreach ($envelopes as $envelope) { $envelopeHandled = true;