diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md
index 6be9bee7fadcf..1329387596e62 100644
--- a/src/Symfony/Component/Messenger/CHANGELOG.md
+++ b/src/Symfony/Component/Messenger/CHANGELOG.md
@@ -1,6 +1,11 @@
CHANGELOG
=========
+7.1
+---
+
+ * Add `--all` option to the `messenger:consume` command
+
7.0
---
diff --git a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
index 28ffee1c37752..129995de7b30b 100644
--- a/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
+++ b/src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
@@ -34,6 +34,7 @@
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
use Symfony\Component\Messenger\EventListener\StopWorkerOnTimeLimitListener;
use Symfony\Component\Messenger\RoutableMessageBus;
+use Symfony\Component\Messenger\Transport\Sync\SyncTransport;
use Symfony\Component\Messenger\Worker;
/**
@@ -83,6 +84,7 @@ protected function configure(): void
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically)'),
new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'),
new InputOption('no-reset', null, InputOption::VALUE_NONE, 'Do not reset container services after each message'),
+ new InputOption('all', null, InputOption::VALUE_NONE, 'Consume messages from all receivers'),
])
->setHelp(<<<'EOF'
The %command.name% command consumes messages and dispatches them to the message bus.
@@ -123,6 +125,10 @@ protected function configure(): void
Use the --no-reset option to prevent services resetting after each message (may lead to leaking services' state between messages):
php %command.full_name% --no-reset
+
+Use the --all option to consume from all receivers:
+
+ php %command.full_name% --all
EOF
)
;
@@ -132,6 +138,10 @@ protected function interact(InputInterface $input, OutputInterface $output): voi
{
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
+ if ($input->getOption('all')) {
+ return;
+ }
+
if ($this->receiverNames && !$input->getArgument('receivers')) {
$io->block('Which transports/receivers do you want to consume?', null, 'fg=white;bg=blue', ' ', true);
@@ -155,7 +165,8 @@ protected function execute(InputInterface $input, OutputInterface $output): int
{
$receivers = [];
$rateLimiters = [];
- foreach ($receiverNames = $input->getArgument('receivers') as $receiverName) {
+ $receiverNames = $input->getOption('all') ? $this->receiverNames : $input->getArgument('receivers');
+ foreach ($receiverNames as $receiverName) {
if (!$this->receiverLocator->has($receiverName)) {
$message = sprintf('The receiver "%s" does not exist.', $receiverName);
if ($this->receiverNames) {
@@ -165,7 +176,15 @@ protected function execute(InputInterface $input, OutputInterface $output): int
throw new RuntimeException($message);
}
- $receivers[$receiverName] = $this->receiverLocator->get($receiverName);
+ $receiver = $this->receiverLocator->get($receiverName);
+ if ($receiver instanceof SyncTransport) {
+ $idx = array_search($receiverName, $receiverNames);
+ unset($receiverNames[$idx]);
+
+ continue;
+ }
+
+ $receivers[$receiverName] = $receiver;
if ($this->rateLimiterLocator?->has($receiverName)) {
$rateLimiters[$receiverName] = $this->rateLimiterLocator->get($receiverName);
}
diff --git a/src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php b/src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php
index 0173052290047..40579ece6fa21 100644
--- a/src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php
+++ b/src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php
@@ -214,6 +214,42 @@ public function testRunWithTimeLimit()
$this->assertStringContainsString('[OK] Consuming messages from transport "dummy-receiver"', $tester->getDisplay());
}
+ public function testRunWithAllOption()
+ {
+ $envelope1 = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
+ $envelope2 = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
+
+ $receiver1 = $this->createMock(ReceiverInterface::class);
+ $receiver1->expects($this->once())->method('get')->willReturn([$envelope1]);
+ $receiver2 = $this->createMock(ReceiverInterface::class);
+ $receiver2->expects($this->once())->method('get')->willReturn([$envelope2]);
+
+ $receiverLocator = $this->createMock(ContainerInterface::class);
+ $receiverLocator->expects($this->once())->method('has')->with('dummy-receiver1')->willReturn(true);
+ $receiverLocator->expects($this->once())->method('get')->with('dummy-receiver1')->willReturn($receiver1);
+ $receiverLocator->expects($this->once())->method('has')->with('dummy-receiver2')->willReturn(true);
+ $receiverLocator->expects($this->once())->method('get')->with('dummy-receiver2')->willReturn($receiver2);
+
+ $bus = $this->createMock(MessageBusInterface::class);
+ $bus->expects($this->exactly(2))->method('dispatch');
+
+ $busLocator = $this->createMock(ContainerInterface::class);
+ $busLocator->expects($this->once())->method('has')->with('dummy-bus')->willReturn(true);
+ $busLocator->expects($this->once())->method('get')->with('dummy-bus')->willReturn($bus);
+
+ $command = new ConsumeMessagesCommand(new RoutableMessageBus($busLocator), $receiverLocator, new EventDispatcher());
+
+ $application = new Application();
+ $application->add($command);
+ $tester = new CommandTester($application->get('messenger:consume'));
+ $tester->execute([
+ '--all' => null,
+ ]);
+
+ $tester->assertCommandIsSuccessful();
+ $this->assertStringContainsString('[OK] Consuming messages from transport "dummy-receiver1, dummy-receiver2"', $tester->getDisplay());
+ }
+
/**
* @dataProvider provideCompletionSuggestions
*/