Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 853449f

Browse filesBrowse files
author
Robin Chalas
committed
[Messenger] Inject RoutableMessageBus instead of bus locator
1 parent 0da2137 commit 853449f
Copy full SHA for 853449f

File tree

4 files changed

+102
-14
lines changed
Filter options

4 files changed

+102
-14
lines changed

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@
8282
</service>
8383

8484
<service id="console.command.messenger_consume_messages" class="Symfony\Component\Messenger\Command\ConsumeMessagesCommand">
85-
<argument /> <!-- Message bus locator -->
85+
<argument type="service" id="messenger.routable_message_bus" />
8686
<argument type="service" id="messenger.receiver_locator" />
8787
<argument type="service" id="logger" on-invalid="null" />
8888
<argument type="collection" /> <!-- Receiver names -->

‎src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
+18-6Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
use Symfony\Component\Console\Output\OutputInterface;
2424
use Symfony\Component\Console\Question\ChoiceQuestion;
2525
use Symfony\Component\Console\Style\SymfonyStyle;
26+
use Symfony\Component\Messenger\Envelope;
2627
use Symfony\Component\Messenger\RoutableMessageBus;
28+
use Symfony\Component\Messenger\Stamp\BusNameStamp;
2729
use Symfony\Component\Messenger\Worker;
2830
use Symfony\Component\Messenger\Worker\StopWhenMemoryUsageIsExceededWorker;
2931
use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker;
@@ -40,6 +42,7 @@ class ConsumeMessagesCommand extends Command
4042
{
4143
protected static $defaultName = 'messenger:consume';
4244

45+
private $routableBus;
4346
private $busLocator;
4447
private $receiverLocator;
4548
private $logger;
@@ -49,15 +52,24 @@ class ConsumeMessagesCommand extends Command
4952
/** @var CacheItemPoolInterface|null */
5053
private $restartSignalCachePool;
5154

52-
public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], /* ContainerInterface */ $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null)
55+
/**
56+
* @param RoutableMessageBus $routableBus
57+
*/
58+
public function __construct($routableBus, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], /* ContainerInterface */ $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null)
5359
{
60+
// to be deprecated in 4.4
61+
if ($routableBus instanceof ContainerInterface) {
62+
$this->busLocator = $routableBus;
63+
$routableBus = null;
64+
}
65+
5466
if (\is_array($retryStrategyLocator)) {
5567
@trigger_error(sprintf('The 5th argument of the class "%s" should be a retry-strategy locator, an array of bus names as a value is deprecated since Symfony 4.3.', __CLASS__), E_USER_DEPRECATED);
5668

5769
$retryStrategyLocator = null;
5870
}
5971

60-
$this->busLocator = $busLocator;
72+
$this->routableBus = $routableBus;
6173
$this->receiverLocator = $receiverLocator;
6274
$this->logger = $logger;
6375
$this->receiverNames = $receiverNames;
@@ -177,10 +189,10 @@ protected function execute(InputInterface $input, OutputInterface $output)
177189
$retryStrategies[$receiverName] = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
178190
}
179191

180-
if (null !== $input->getOption('bus')) {
181-
$bus = $this->busLocator->get($input->getOption('bus'));
182-
} else {
183-
$bus = new RoutableMessageBus($this->busLocator);
192+
if (null !== $busName = $input->getOption('bus')) {
193+
$bus = $this->routableBus ? $this->routableBus->getMessageBus($busName) : $this->busLocator->get($busName);
194+
} elseif (!$bus = $this->routableBus) {
195+
$bus = $this->routableBus ? $this->routableBus : new RoutableMessageBus($this->routableBus);
184196
}
185197

186198
$worker = new Worker($receivers, $bus, $retryStrategies, $this->eventDispatcher, $this->logger);

‎src/Symfony/Component/Messenger/RoutableMessageBus.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/RoutableMessageBus.php
+8-7Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,6 @@ public function dispatch($envelope, array $stamps = []): Envelope
4545
throw new InvalidArgumentException('Messages passed to RoutableMessageBus::dispatch() must be inside an Envelope');
4646
}
4747

48-
return $this->getMessageBus($envelope)->dispatch($envelope, $stamps);
49-
}
50-
51-
private function getMessageBus(Envelope $envelope): MessageBusInterface
52-
{
5348
/** @var BusNameStamp|null $busNameStamp */
5449
$busNameStamp = $envelope->last(BusNameStamp::class);
5550

@@ -58,11 +53,17 @@ private function getMessageBus(Envelope $envelope): MessageBusInterface
5853
throw new InvalidArgumentException(sprintf('Envelope is missing a BusNameStamp and no fallback message bus is configured on RoutableMessageBus.'));
5954
}
6055

61-
return $this->fallbackBus;
56+
return $this->fallbackBus->dispatch($envelope, $stamps);
6257
}
6358

64-
$busName = $busNameStamp->getBusName();
59+
return $this->getMessageBus($busNameStamp->getBusName())->dispatch($envelope, $stamps);
60+
}
6561

62+
/**
63+
* @internal
64+
*/
65+
public function getMessageBus(string $busName): MessageBusInterface
66+
{
6667
if (!$this->busLocator->has($busName)) {
6768
throw new InvalidArgumentException(sprintf('Bus named "%s" does not exist.', $busName));
6869
}

‎src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php
+75Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,18 @@
1212
namespace Symfony\Component\Messenger\Tests\Command;
1313

1414
use PHPUnit\Framework\TestCase;
15+
use Symfony\Component\Console\Application;
16+
use Symfony\Component\Console\Tester\CommandTester;
17+
use Symfony\Component\DependencyInjection\ContainerInterface;
1518
use Symfony\Component\DependencyInjection\ServiceLocator;
1619
use Symfony\Component\Messenger\Command\ConsumeMessagesCommand;
20+
use Symfony\Component\Messenger\Command\FailedMessagesRemoveCommand;
21+
use Symfony\Component\Messenger\Envelope;
22+
use Symfony\Component\Messenger\MessageBusInterface;
23+
use Symfony\Component\Messenger\RoutableMessageBus;
24+
use Symfony\Component\Messenger\Stamp\BusNameStamp;
25+
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
26+
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
1727

1828
class ConsumeMessagesCommandTest extends TestCase
1929
{
@@ -24,4 +34,69 @@ public function testConfigurationWithDefaultReceiver()
2434
$this->assertFalse($inputArgument->isRequired());
2535
$this->assertSame(['amqp'], $inputArgument->getDefault());
2636
}
37+
38+
public function testBasicRun()
39+
{
40+
$envelope = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
41+
42+
$receiver = $this->createMock(ReceiverInterface::class);
43+
$receiver->expects($this->once())->method('get')->willReturn([$envelope]);
44+
45+
$receiverLocator = $this->createMock(ContainerInterface::class);
46+
$receiverLocator->expects($this->once())->method('has')->with('dummy-receiver')->willReturn(true);
47+
$receiverLocator->expects($this->once())->method('get')->with('dummy-receiver')->willReturn($receiver);
48+
49+
$bus = $this->createMock(MessageBusInterface::class);
50+
$bus->expects($this->once())->method('dispatch');
51+
52+
$busLocator = $this->createMock(ContainerInterface::class);
53+
$busLocator->expects($this->once())->method('has')->with('dummy-bus')->willReturn(true);
54+
$busLocator->expects($this->once())->method('get')->with('dummy-bus')->willReturn($bus);
55+
56+
$command = new ConsumeMessagesCommand(new RoutableMessageBus($busLocator), $receiverLocator);
57+
58+
$application = new Application();
59+
$application->add($command);
60+
$tester = new CommandTester($application->get('messenger:consume'));
61+
$tester->execute([
62+
'receivers' => ['dummy-receiver'],
63+
'--limit' => 1,
64+
]);
65+
66+
$this->assertSame(0, $tester->getStatusCode());
67+
$this->assertContains('[OK] Consuming messages from transports "dummy-receiver"', $tester->getDisplay());
68+
}
69+
70+
public function testRunWithBusOption()
71+
{
72+
$envelope = new Envelope(new \stdClass());
73+
74+
$receiver = $this->createMock(ReceiverInterface::class);
75+
$receiver->expects($this->once())->method('get')->willReturn([$envelope]);
76+
77+
$receiverLocator = $this->createMock(ContainerInterface::class);
78+
$receiverLocator->expects($this->once())->method('has')->with('dummy-receiver')->willReturn(true);
79+
$receiverLocator->expects($this->once())->method('get')->with('dummy-receiver')->willReturn($receiver);
80+
81+
$bus = $this->createMock(MessageBusInterface::class);
82+
$bus->expects($this->once())->method('dispatch');
83+
84+
$busLocator = $this->createMock(ContainerInterface::class);
85+
$busLocator->expects($this->once())->method('has')->with('dummy-bus')->willReturn(true);
86+
$busLocator->expects($this->once())->method('get')->with('dummy-bus')->willReturn($bus);
87+
88+
$command = new ConsumeMessagesCommand(new RoutableMessageBus($busLocator), $receiverLocator);
89+
90+
$application = new Application();
91+
$application->add($command);
92+
$tester = new CommandTester($application->get('messenger:consume'));
93+
$tester->execute([
94+
'receivers' => ['dummy-receiver'],
95+
'--bus' => 'dummy-bus',
96+
'--limit' => 1,
97+
]);
98+
99+
$this->assertSame(0, $tester->getStatusCode());
100+
$this->assertContains('[OK] Consuming messages from transports "dummy-receiver"', $tester->getDisplay());
101+
}
27102
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.