Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit dc330b0

Browse filesBrowse files
committed
feature #52411 [Messenger] Add --all option to messenger:consume (javaDeveloperKid)
This PR was squashed before being merged into the 7.1 branch. Discussion ---------- [Messenger] Add `--all` option to `messenger:consume` | Q | A | ------------- | --- | Branch? | 7.1 | Bug fix? | no | New feature? | yes <!-- please update src/**/CHANGELOG.md files --> | Deprecations? | no <!-- please update UPGRADE-*.md and src/**/CHANGELOG.md files --> | Issues | Closes #52364 <!-- prefix each issue number with "Fix #", no need to create an issue if none exists, explain below instead --> | License | MIT When implementing this feature the problem with sync transports came out so the `if` statement for this was needed. I can see someone reported this 2 months ago in #51556. I think this be can fixed properly in a dedicated PR because it requires to dig into MessengerPass I guess. Commits ------- ae454e0 [Messenger] Add `--all` option to `messenger:consume`
2 parents 11f09e0 + ae454e0 commit dc330b0
Copy full SHA for dc330b0

File tree

3 files changed

+62
-2
lines changed
Filter options

3 files changed

+62
-2
lines changed

‎src/Symfony/Component/Messenger/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/CHANGELOG.md
+5Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
CHANGELOG
22
=========
33

4+
7.1
5+
---
6+
7+
* Add `--all` option to the `messenger:consume` command
8+
49
7.0
510
---
611

‎src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
+21-2Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
3535
use Symfony\Component\Messenger\EventListener\StopWorkerOnTimeLimitListener;
3636
use Symfony\Component\Messenger\RoutableMessageBus;
37+
use Symfony\Component\Messenger\Transport\Sync\SyncTransport;
3738
use Symfony\Component\Messenger\Worker;
3839

3940
/**
@@ -83,6 +84,7 @@ protected function configure(): void
8384
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically)'),
8485
new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'),
8586
new InputOption('no-reset', null, InputOption::VALUE_NONE, 'Do not reset container services after each message'),
87+
new InputOption('all', null, InputOption::VALUE_NONE, 'Consume messages from all receivers'),
8688
])
8789
->setHelp(<<<'EOF'
8890
The <info>%command.name%</info> command consumes messages and dispatches them to the message bus.
@@ -123,6 +125,10 @@ protected function configure(): void
123125
Use the --no-reset option to prevent services resetting after each message (may lead to leaking services' state between messages):
124126
125127
<info>php %command.full_name% <receiver-name> --no-reset</info>
128+
129+
Use the --all option to consume from all receivers:
130+
131+
<info>php %command.full_name% --all</info>
126132
EOF
127133
)
128134
;
@@ -132,6 +138,10 @@ protected function interact(InputInterface $input, OutputInterface $output): voi
132138
{
133139
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
134140

141+
if ($input->getOption('all')) {
142+
return;
143+
}
144+
135145
if ($this->receiverNames && !$input->getArgument('receivers')) {
136146
$io->block('Which transports/receivers do you want to consume?', null, 'fg=white;bg=blue', ' ', true);
137147

@@ -155,7 +165,8 @@ protected function execute(InputInterface $input, OutputInterface $output): int
155165
{
156166
$receivers = [];
157167
$rateLimiters = [];
158-
foreach ($receiverNames = $input->getArgument('receivers') as $receiverName) {
168+
$receiverNames = $input->getOption('all') ? $this->receiverNames : $input->getArgument('receivers');
169+
foreach ($receiverNames as $receiverName) {
159170
if (!$this->receiverLocator->has($receiverName)) {
160171
$message = sprintf('The receiver "%s" does not exist.', $receiverName);
161172
if ($this->receiverNames) {
@@ -165,7 +176,15 @@ protected function execute(InputInterface $input, OutputInterface $output): int
165176
throw new RuntimeException($message);
166177
}
167178

168-
$receivers[$receiverName] = $this->receiverLocator->get($receiverName);
179+
$receiver = $this->receiverLocator->get($receiverName);
180+
if ($receiver instanceof SyncTransport) {
181+
$idx = array_search($receiverName, $receiverNames);
182+
unset($receiverNames[$idx]);
183+
184+
continue;
185+
}
186+
187+
$receivers[$receiverName] = $receiver;
169188
if ($this->rateLimiterLocator?->has($receiverName)) {
170189
$rateLimiters[$receiverName] = $this->rateLimiterLocator->get($receiverName);
171190
}

‎src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php
+36Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,42 @@ public function testRunWithTimeLimit()
214214
$this->assertStringContainsString('[OK] Consuming messages from transport "dummy-receiver"', $tester->getDisplay());
215215
}
216216

217+
public function testRunWithAllOption()
218+
{
219+
$envelope1 = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
220+
$envelope2 = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
221+
222+
$receiver1 = $this->createMock(ReceiverInterface::class);
223+
$receiver1->expects($this->once())->method('get')->willReturn([$envelope1]);
224+
$receiver2 = $this->createMock(ReceiverInterface::class);
225+
$receiver2->expects($this->once())->method('get')->willReturn([$envelope2]);
226+
227+
$receiverLocator = $this->createMock(ContainerInterface::class);
228+
$receiverLocator->expects($this->once())->method('has')->with('dummy-receiver1')->willReturn(true);
229+
$receiverLocator->expects($this->once())->method('get')->with('dummy-receiver1')->willReturn($receiver1);
230+
$receiverLocator->expects($this->once())->method('has')->with('dummy-receiver2')->willReturn(true);
231+
$receiverLocator->expects($this->once())->method('get')->with('dummy-receiver2')->willReturn($receiver2);
232+
233+
$bus = $this->createMock(MessageBusInterface::class);
234+
$bus->expects($this->exactly(2))->method('dispatch');
235+
236+
$busLocator = $this->createMock(ContainerInterface::class);
237+
$busLocator->expects($this->once())->method('has')->with('dummy-bus')->willReturn(true);
238+
$busLocator->expects($this->once())->method('get')->with('dummy-bus')->willReturn($bus);
239+
240+
$command = new ConsumeMessagesCommand(new RoutableMessageBus($busLocator), $receiverLocator, new EventDispatcher());
241+
242+
$application = new Application();
243+
$application->add($command);
244+
$tester = new CommandTester($application->get('messenger:consume'));
245+
$tester->execute([
246+
'--all' => null,
247+
]);
248+
249+
$tester->assertCommandIsSuccessful();
250+
$this->assertStringContainsString('[OK] Consuming messages from transport "dummy-receiver1, dummy-receiver2"', $tester->getDisplay());
251+
}
252+
217253
/**
218254
* @dataProvider provideCompletionSuggestions
219255
*/

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.