Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit dc95fa5

Browse filesBrowse files
author
Robin Chalas
committed
[Messenger] Inject RoutableMessageBus instead of bus locator
1 parent 0da2137 commit dc95fa5
Copy full SHA for dc95fa5

File tree

4 files changed

+98
-14
lines changed
Filter options

4 files changed

+98
-14
lines changed

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@
8282
</service>
8383

8484
<service id="console.command.messenger_consume_messages" class="Symfony\Component\Messenger\Command\ConsumeMessagesCommand">
85-
<argument /> <!-- Message bus locator -->
85+
<argument type="service" id="messenger.routable_message_bus" />
8686
<argument type="service" id="messenger.receiver_locator" />
8787
<argument type="service" id="logger" on-invalid="null" />
8888
<argument type="collection" /> <!-- Receiver names -->

‎src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
+16-6Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ class ConsumeMessagesCommand extends Command
4040
{
4141
protected static $defaultName = 'messenger:consume';
4242

43+
private $routableBus;
4344
private $busLocator;
4445
private $receiverLocator;
4546
private $logger;
@@ -49,15 +50,24 @@ class ConsumeMessagesCommand extends Command
4950
/** @var CacheItemPoolInterface|null */
5051
private $restartSignalCachePool;
5152

52-
public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], /* ContainerInterface */ $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null)
53+
/**
54+
* @param RoutableMessageBus $routableBus
55+
*/
56+
public function __construct($routableBus, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], /* ContainerInterface */ $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null)
5357
{
58+
// to be deprecated in 4.4
59+
if ($routableBus instanceof ContainerInterface) {
60+
$this->busLocator = $routableBus;
61+
$routableBus = null;
62+
}
63+
5464
if (\is_array($retryStrategyLocator)) {
5565
@trigger_error(sprintf('The 5th argument of the class "%s" should be a retry-strategy locator, an array of bus names as a value is deprecated since Symfony 4.3.', __CLASS__), E_USER_DEPRECATED);
5666

5767
$retryStrategyLocator = null;
5868
}
5969

60-
$this->busLocator = $busLocator;
70+
$this->routableBus = $routableBus;
6171
$this->receiverLocator = $receiverLocator;
6272
$this->logger = $logger;
6373
$this->receiverNames = $receiverNames;
@@ -177,10 +187,10 @@ protected function execute(InputInterface $input, OutputInterface $output)
177187
$retryStrategies[$receiverName] = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
178188
}
179189

180-
if (null !== $input->getOption('bus')) {
181-
$bus = $this->busLocator->get($input->getOption('bus'));
182-
} else {
183-
$bus = new RoutableMessageBus($this->busLocator);
190+
if (null !== $busName = $input->getOption('bus')) {
191+
$bus = $this->routableBus ? $this->routableBus->getMessageBus($busName) : $this->busLocator->get($busName);
192+
} elseif (!$bus = $this->routableBus) {
193+
$bus = $this->routableBus ? $this->routableBus : new RoutableMessageBus($this->routableBus);
184194
}
185195

186196
$worker = new Worker($receivers, $bus, $retryStrategies, $this->eventDispatcher, $this->logger);

‎src/Symfony/Component/Messenger/RoutableMessageBus.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/RoutableMessageBus.php
+8-7Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,6 @@ public function dispatch($envelope, array $stamps = []): Envelope
4545
throw new InvalidArgumentException('Messages passed to RoutableMessageBus::dispatch() must be inside an Envelope');
4646
}
4747

48-
return $this->getMessageBus($envelope)->dispatch($envelope, $stamps);
49-
}
50-
51-
private function getMessageBus(Envelope $envelope): MessageBusInterface
52-
{
5348
/** @var BusNameStamp|null $busNameStamp */
5449
$busNameStamp = $envelope->last(BusNameStamp::class);
5550

@@ -58,11 +53,17 @@ private function getMessageBus(Envelope $envelope): MessageBusInterface
5853
throw new InvalidArgumentException(sprintf('Envelope is missing a BusNameStamp and no fallback message bus is configured on RoutableMessageBus.'));
5954
}
6055

61-
return $this->fallbackBus;
56+
return $this->fallbackBus->dispatch($envelope, $stamps);
6257
}
6358

64-
$busName = $busNameStamp->getBusName();
59+
return $this->getMessageBus($busNameStamp->getBusName())->dispatch($envelope, $stamps);
60+
}
6561

62+
/**
63+
* @internal
64+
*/
65+
public function getMessageBus(string $busName): MessageBusInterface
66+
{
6667
if (!$this->busLocator->has($busName)) {
6768
throw new InvalidArgumentException(sprintf('Bus named "%s" does not exist.', $busName));
6869
}

‎src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php
+73Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,16 @@
1212
namespace Symfony\Component\Messenger\Tests\Command;
1313

1414
use PHPUnit\Framework\TestCase;
15+
use Symfony\Component\Console\Application;
16+
use Symfony\Component\Console\Tester\CommandTester;
17+
use Symfony\Component\DependencyInjection\ContainerInterface;
1518
use Symfony\Component\DependencyInjection\ServiceLocator;
1619
use Symfony\Component\Messenger\Command\ConsumeMessagesCommand;
20+
use Symfony\Component\Messenger\Envelope;
21+
use Symfony\Component\Messenger\MessageBusInterface;
22+
use Symfony\Component\Messenger\RoutableMessageBus;
23+
use Symfony\Component\Messenger\Stamp\BusNameStamp;
24+
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
1725

1826
class ConsumeMessagesCommandTest extends TestCase
1927
{
@@ -24,4 +32,69 @@ public function testConfigurationWithDefaultReceiver()
2432
$this->assertFalse($inputArgument->isRequired());
2533
$this->assertSame(['amqp'], $inputArgument->getDefault());
2634
}
35+
36+
public function testBasicRun()
37+
{
38+
$envelope = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
39+
40+
$receiver = $this->createMock(ReceiverInterface::class);
41+
$receiver->expects($this->once())->method('get')->willReturn([$envelope]);
42+
43+
$receiverLocator = $this->createMock(ContainerInterface::class);
44+
$receiverLocator->expects($this->once())->method('has')->with('dummy-receiver')->willReturn(true);
45+
$receiverLocator->expects($this->once())->method('get')->with('dummy-receiver')->willReturn($receiver);
46+
47+
$bus = $this->createMock(MessageBusInterface::class);
48+
$bus->expects($this->once())->method('dispatch');
49+
50+
$busLocator = $this->createMock(ContainerInterface::class);
51+
$busLocator->expects($this->once())->method('has')->with('dummy-bus')->willReturn(true);
52+
$busLocator->expects($this->once())->method('get')->with('dummy-bus')->willReturn($bus);
53+
54+
$command = new ConsumeMessagesCommand(new RoutableMessageBus($busLocator), $receiverLocator);
55+
56+
$application = new Application();
57+
$application->add($command);
58+
$tester = new CommandTester($application->get('messenger:consume'));
59+
$tester->execute([
60+
'receivers' => ['dummy-receiver'],
61+
'--limit' => 1,
62+
]);
63+
64+
$this->assertSame(0, $tester->getStatusCode());
65+
$this->assertContains('[OK] Consuming messages from transports "dummy-receiver"', $tester->getDisplay());
66+
}
67+
68+
public function testRunWithBusOption()
69+
{
70+
$envelope = new Envelope(new \stdClass());
71+
72+
$receiver = $this->createMock(ReceiverInterface::class);
73+
$receiver->expects($this->once())->method('get')->willReturn([$envelope]);
74+
75+
$receiverLocator = $this->createMock(ContainerInterface::class);
76+
$receiverLocator->expects($this->once())->method('has')->with('dummy-receiver')->willReturn(true);
77+
$receiverLocator->expects($this->once())->method('get')->with('dummy-receiver')->willReturn($receiver);
78+
79+
$bus = $this->createMock(MessageBusInterface::class);
80+
$bus->expects($this->once())->method('dispatch');
81+
82+
$busLocator = $this->createMock(ContainerInterface::class);
83+
$busLocator->expects($this->once())->method('has')->with('dummy-bus')->willReturn(true);
84+
$busLocator->expects($this->once())->method('get')->with('dummy-bus')->willReturn($bus);
85+
86+
$command = new ConsumeMessagesCommand(new RoutableMessageBus($busLocator), $receiverLocator);
87+
88+
$application = new Application();
89+
$application->add($command);
90+
$tester = new CommandTester($application->get('messenger:consume'));
91+
$tester->execute([
92+
'receivers' => ['dummy-receiver'],
93+
'--bus' => 'dummy-bus',
94+
'--limit' => 1,
95+
]);
96+
97+
$this->assertSame(0, $tester->getStatusCode());
98+
$this->assertContains('[OK] Consuming messages from transports "dummy-receiver"', $tester->getDisplay());
99+
}
27100
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.