Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 6c93002

Browse filesBrowse files
committed
bug #31748 [Messenger] Inject RoutableMessageBus instead of bus locator (chalasr)
This PR was merged into the 4.3 branch. Discussion ---------- [Messenger] Inject RoutableMessageBus instead of bus locator | Q | A | ------------- | --- | Branch? | 4.3 | Bug fix? | yes | New feature? | no | BC breaks? | no | Deprecations? | no | Tests pass? | yes | Fixed tickets | #31741 | License | MIT | Doc PR | n/a Commits ------- 91817e4 [Messenger] Inject RoutableMessageBus instead of bus locator
2 parents 1318d3b + 91817e4 commit 6c93002
Copy full SHA for 6c93002

File tree

5 files changed

+159
-17
lines changed
Filter options

5 files changed

+159
-17
lines changed

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@
8282
</service>
8383

8484
<service id="console.command.messenger_consume_messages" class="Symfony\Component\Messenger\Command\ConsumeMessagesCommand">
85-
<argument /> <!-- Message bus locator -->
85+
<argument type="service" id="messenger.routable_message_bus" />
8686
<argument type="service" id="messenger.receiver_locator" />
8787
<argument type="service" id="logger" on-invalid="null" />
8888
<argument type="collection" /> <!-- Receiver names -->

‎src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
+12-8Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class ConsumeMessagesCommand extends Command
4040
{
4141
protected static $defaultName = 'messenger:consume';
4242

43-
private $busLocator;
43+
private $routableBus;
4444
private $receiverLocator;
4545
private $logger;
4646
private $receiverNames;
@@ -49,15 +49,23 @@ class ConsumeMessagesCommand extends Command
4949
/** @var CacheItemPoolInterface|null */
5050
private $restartSignalCachePool;
5151

52-
public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], /* ContainerInterface */ $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null)
52+
/**
53+
* @param RoutableMessageBus $routableBus
54+
*/
55+
public function __construct($routableBus, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], /* ContainerInterface */ $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null)
5356
{
57+
// to be deprecated in 4.4
58+
if ($routableBus instanceof ContainerInterface) {
59+
$routableBus = new RoutableMessageBus($routableBus);
60+
}
61+
5462
if (\is_array($retryStrategyLocator)) {
5563
@trigger_error(sprintf('The 5th argument of the class "%s" should be a retry-strategy locator, an array of bus names as a value is deprecated since Symfony 4.3.', __CLASS__), E_USER_DEPRECATED);
5664

5765
$retryStrategyLocator = null;
5866
}
5967

60-
$this->busLocator = $busLocator;
68+
$this->routableBus = $routableBus;
6169
$this->receiverLocator = $receiverLocator;
6270
$this->logger = $logger;
6371
$this->receiverNames = $receiverNames;
@@ -177,11 +185,7 @@ protected function execute(InputInterface $input, OutputInterface $output)
177185
$retryStrategies[$receiverName] = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
178186
}
179187

180-
if (null !== $input->getOption('bus')) {
181-
$bus = $this->busLocator->get($input->getOption('bus'));
182-
} else {
183-
$bus = new RoutableMessageBus($this->busLocator);
184-
}
188+
$bus = $input->getOption('bus') ? $this->routableBus->getMessageBus($input->getOption('bus')) : $this->routableBus;
185189

186190
$worker = new Worker($receivers, $bus, $retryStrategies, $this->eventDispatcher, $this->logger);
187191
$stopsWhen = [];

‎src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php
-1Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,6 @@ private function registerReceivers(ContainerBuilder $container, array $busIds)
260260

261261
if ($container->hasDefinition('console.command.messenger_consume_messages')) {
262262
$container->getDefinition('console.command.messenger_consume_messages')
263-
->replaceArgument(0, ServiceLocatorTagPass::register($container, $buses))
264263
->replaceArgument(3, array_values($receiverNames));
265264
}
266265

‎src/Symfony/Component/Messenger/RoutableMessageBus.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/RoutableMessageBus.php
+8-7Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,6 @@ public function dispatch($envelope, array $stamps = []): Envelope
4545
throw new InvalidArgumentException('Messages passed to RoutableMessageBus::dispatch() must be inside an Envelope');
4646
}
4747

48-
return $this->getMessageBus($envelope)->dispatch($envelope, $stamps);
49-
}
50-
51-
private function getMessageBus(Envelope $envelope): MessageBusInterface
52-
{
5348
/** @var BusNameStamp|null $busNameStamp */
5449
$busNameStamp = $envelope->last(BusNameStamp::class);
5550

@@ -58,11 +53,17 @@ private function getMessageBus(Envelope $envelope): MessageBusInterface
5853
throw new InvalidArgumentException(sprintf('Envelope is missing a BusNameStamp and no fallback message bus is configured on RoutableMessageBus.'));
5954
}
6055

61-
return $this->fallbackBus;
56+
return $this->fallbackBus->dispatch($envelope, $stamps);
6257
}
6358

64-
$busName = $busNameStamp->getBusName();
59+
return $this->getMessageBus($busNameStamp->getBusName())->dispatch($envelope, $stamps);
60+
}
6561

62+
/**
63+
* @internal
64+
*/
65+
public function getMessageBus(string $busName): MessageBusInterface
66+
{
6667
if (!$this->busLocator->has($busName)) {
6768
throw new InvalidArgumentException(sprintf('Bus named "%s" does not exist.', $busName));
6869
}

‎src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php
+138Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,16 @@
1212
namespace Symfony\Component\Messenger\Tests\Command;
1313

1414
use PHPUnit\Framework\TestCase;
15+
use Symfony\Component\Console\Application;
16+
use Symfony\Component\Console\Tester\CommandTester;
17+
use Symfony\Component\DependencyInjection\ContainerInterface;
1518
use Symfony\Component\DependencyInjection\ServiceLocator;
1619
use Symfony\Component\Messenger\Command\ConsumeMessagesCommand;
20+
use Symfony\Component\Messenger\Envelope;
21+
use Symfony\Component\Messenger\MessageBusInterface;
22+
use Symfony\Component\Messenger\RoutableMessageBus;
23+
use Symfony\Component\Messenger\Stamp\BusNameStamp;
24+
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
1725

1826
class ConsumeMessagesCommandTest extends TestCase
1927
{
@@ -24,4 +32,134 @@ public function testConfigurationWithDefaultReceiver()
2432
$this->assertFalse($inputArgument->isRequired());
2533
$this->assertSame(['amqp'], $inputArgument->getDefault());
2634
}
35+
36+
public function testBasicRun()
37+
{
38+
$envelope = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
39+
40+
$receiver = $this->createMock(ReceiverInterface::class);
41+
$receiver->expects($this->once())->method('get')->willReturn([$envelope]);
42+
43+
$receiverLocator = $this->createMock(ContainerInterface::class);
44+
$receiverLocator->expects($this->once())->method('has')->with('dummy-receiver')->willReturn(true);
45+
$receiverLocator->expects($this->once())->method('get')->with('dummy-receiver')->willReturn($receiver);
46+
47+
$bus = $this->createMock(MessageBusInterface::class);
48+
$bus->expects($this->once())->method('dispatch');
49+
50+
$busLocator = $this->createMock(ContainerInterface::class);
51+
$busLocator->expects($this->once())->method('has')->with('dummy-bus')->willReturn(true);
52+
$busLocator->expects($this->once())->method('get')->with('dummy-bus')->willReturn($bus);
53+
54+
$command = new ConsumeMessagesCommand(new RoutableMessageBus($busLocator), $receiverLocator);
55+
56+
$application = new Application();
57+
$application->add($command);
58+
$tester = new CommandTester($application->get('messenger:consume'));
59+
$tester->execute([
60+
'receivers' => ['dummy-receiver'],
61+
'--limit' => 1,
62+
]);
63+
64+
$this->assertSame(0, $tester->getStatusCode());
65+
$this->assertContains('[OK] Consuming messages from transports "dummy-receiver"', $tester->getDisplay());
66+
}
67+
68+
public function testRunWithBusOption()
69+
{
70+
$envelope = new Envelope(new \stdClass());
71+
72+
$receiver = $this->createMock(ReceiverInterface::class);
73+
$receiver->expects($this->once())->method('get')->willReturn([$envelope]);
74+
75+
$receiverLocator = $this->createMock(ContainerInterface::class);
76+
$receiverLocator->expects($this->once())->method('has')->with('dummy-receiver')->willReturn(true);
77+
$receiverLocator->expects($this->once())->method('get')->with('dummy-receiver')->willReturn($receiver);
78+
79+
$bus = $this->createMock(MessageBusInterface::class);
80+
$bus->expects($this->once())->method('dispatch');
81+
82+
$busLocator = $this->createMock(ContainerInterface::class);
83+
$busLocator->expects($this->once())->method('has')->with('dummy-bus')->willReturn(true);
84+
$busLocator->expects($this->once())->method('get')->with('dummy-bus')->willReturn($bus);
85+
86+
$command = new ConsumeMessagesCommand(new RoutableMessageBus($busLocator), $receiverLocator);
87+
88+
$application = new Application();
89+
$application->add($command);
90+
$tester = new CommandTester($application->get('messenger:consume'));
91+
$tester->execute([
92+
'receivers' => ['dummy-receiver'],
93+
'--bus' => 'dummy-bus',
94+
'--limit' => 1,
95+
]);
96+
97+
$this->assertSame(0, $tester->getStatusCode());
98+
$this->assertContains('[OK] Consuming messages from transports "dummy-receiver"', $tester->getDisplay());
99+
}
100+
101+
public function testBasicRunWithBusLocator()
102+
{
103+
$envelope = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
104+
105+
$receiver = $this->createMock(ReceiverInterface::class);
106+
$receiver->expects($this->once())->method('get')->willReturn([$envelope]);
107+
108+
$receiverLocator = $this->createMock(ContainerInterface::class);
109+
$receiverLocator->expects($this->once())->method('has')->with('dummy-receiver')->willReturn(true);
110+
$receiverLocator->expects($this->once())->method('get')->with('dummy-receiver')->willReturn($receiver);
111+
112+
$bus = $this->createMock(MessageBusInterface::class);
113+
$bus->expects($this->once())->method('dispatch');
114+
115+
$busLocator = $this->createMock(ContainerInterface::class);
116+
$busLocator->expects($this->once())->method('has')->with('dummy-bus')->willReturn(true);
117+
$busLocator->expects($this->once())->method('get')->with('dummy-bus')->willReturn($bus);
118+
119+
$command = new ConsumeMessagesCommand($busLocator, $receiverLocator);
120+
121+
$application = new Application();
122+
$application->add($command);
123+
$tester = new CommandTester($application->get('messenger:consume'));
124+
$tester->execute([
125+
'receivers' => ['dummy-receiver'],
126+
'--limit' => 1,
127+
]);
128+
129+
$this->assertSame(0, $tester->getStatusCode());
130+
$this->assertContains('[OK] Consuming messages from transports "dummy-receiver"', $tester->getDisplay());
131+
}
132+
133+
public function testRunWithBusOptionAndBusLocator()
134+
{
135+
$envelope = new Envelope(new \stdClass());
136+
137+
$receiver = $this->createMock(ReceiverInterface::class);
138+
$receiver->expects($this->once())->method('get')->willReturn([$envelope]);
139+
140+
$receiverLocator = $this->createMock(ContainerInterface::class);
141+
$receiverLocator->expects($this->once())->method('has')->with('dummy-receiver')->willReturn(true);
142+
$receiverLocator->expects($this->once())->method('get')->with('dummy-receiver')->willReturn($receiver);
143+
144+
$bus = $this->createMock(MessageBusInterface::class);
145+
$bus->expects($this->once())->method('dispatch');
146+
147+
$busLocator = $this->createMock(ContainerInterface::class);
148+
$busLocator->expects($this->once())->method('has')->with('dummy-bus')->willReturn(true);
149+
$busLocator->expects($this->once())->method('get')->with('dummy-bus')->willReturn($bus);
150+
151+
$command = new ConsumeMessagesCommand($busLocator, $receiverLocator);
152+
153+
$application = new Application();
154+
$application->add($command);
155+
$tester = new CommandTester($application->get('messenger:consume'));
156+
$tester->execute([
157+
'receivers' => ['dummy-receiver'],
158+
'--bus' => 'dummy-bus',
159+
'--limit' => 1,
160+
]);
161+
162+
$this->assertSame(0, $tester->getStatusCode());
163+
$this->assertContains('[OK] Consuming messages from transports "dummy-receiver"', $tester->getDisplay());
164+
}
27165
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.