Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit e512b7e

Browse filesBrowse files
committed
feature #30652 Fixing a bug where messenger:consume could send message to wrong bus (weaverryan)
This PR was merged into the 4.3-dev branch. Discussion ---------- Fixing a bug where messenger:consume could send message to wrong bus | Q | A | ------------- | --- | Branch? | master | Bug fix? | yes | New feature? | arguably, yes | BC breaks? | no | Deprecations? | no | Tests pass? | yes | Fixed tickets | #30631 | License | MIT | Doc PR | Not needed This fixes #30631, where you can run `messener:consume` and accidentally sent received messages into the wrong bus. The fix (done via middleware) is to attach a "bus name" to the `Envelope` and use it when the message is received to find that bus. Commits ------- ef077cf Fixing a bug where a transport could receive a message and dispatch it to a different bus
2 parents e3970f9 + ef077cf commit e512b7e
Copy full SHA for e512b7e

File tree

13 files changed

+272
-36
lines changed
Filter options

13 files changed

+272
-36
lines changed

‎src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
+12-2Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1616,8 +1616,14 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
16161616
}
16171617

16181618
$defaultMiddleware = [
1619-
'before' => [['id' => 'dispatch_after_current_bus']],
1620-
'after' => [['id' => 'send_message'], ['id' => 'handle_message']],
1619+
'before' => [
1620+
['id' => 'add_bus_name_stamp_middleware'],
1621+
['id' => 'dispatch_after_current_bus'],
1622+
],
1623+
'after' => [
1624+
['id' => 'send_message'],
1625+
['id' => 'handle_message'],
1626+
],
16211627
];
16221628
foreach ($config['buses'] as $busId => $bus) {
16231629
$middleware = $bus['middleware'];
@@ -1628,6 +1634,10 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
16281634
} else {
16291635
unset($defaultMiddleware['after'][1]['arguments']);
16301636
}
1637+
1638+
// argument to add_bus_name_stamp_middleware
1639+
$defaultMiddleware['before'][0]['arguments'] = [$busId];
1640+
16311641
$middleware = array_merge($defaultMiddleware['before'], $middleware, $defaultMiddleware['after']);
16321642
}
16331643

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml
-1Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@
8181
<argument type="service" id="messenger.receiver_locator" />
8282
<argument type="service" id="logger" on-invalid="null" />
8383
<argument type="collection" /> <!-- Receiver names -->
84-
<argument type="collection" /> <!-- Message bus names -->
8584
<argument type="service" id="messenger.retry_strategy_locator" />
8685
<argument type="service" id="event_dispatcher" />
8786

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
+2Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
</call>
4040
</service>
4141

42+
<service id="messenger.middleware.add_bus_name_stamp_middleware" class="Symfony\Component\Messenger\Middleware\AddBusNameStampMiddleware" abstract="true" />
43+
4244
<service id="messenger.middleware.dispatch_after_current_bus" class="Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware" />
4345

4446
<service id="messenger.middleware.validation" class="Symfony\Component\Messenger\Middleware\ValidationMiddleware">

‎src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php
+2Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -727,13 +727,15 @@ public function testMessengerWithMultipleBuses()
727727
$this->assertTrue($container->has('messenger.bus.commands'));
728728
$this->assertSame([], $container->getDefinition('messenger.bus.commands')->getArgument(0));
729729
$this->assertEquals([
730+
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.commands']],
730731
['id' => 'dispatch_after_current_bus'],
731732
['id' => 'send_message'],
732733
['id' => 'handle_message'],
733734
], $container->getParameter('messenger.bus.commands.middleware'));
734735
$this->assertTrue($container->has('messenger.bus.events'));
735736
$this->assertSame([], $container->getDefinition('messenger.bus.events')->getArgument(0));
736737
$this->assertEquals([
738+
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.events']],
737739
['id' => 'dispatch_after_current_bus'],
738740
['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]],
739741
['id' => 'send_message'],

‎src/Symfony/Component/Messenger/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/CHANGELOG.md
+6-1Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,12 @@ CHANGELOG
33

44
4.3.0
55
-----
6-
6+
7+
* New classes: `RoutableMessageBus`, `AddBusNameStampMiddleware`
8+
and `BusNameStamp` were added, which allow you to add a bus identifier
9+
to the `Envelope` then find the correct bus when receiving from
10+
the transport. See `ConsumeMessagesCommand`.
11+
* An optional `ConsumeMessagesCommand` constructor argument was removed.
712
* [BC BREAK] 2 new methods were added to `ReceiverInterface`:
813
`ack()` and `reject()`.
914
* [BC BREAK] Error handling was moved from the receivers into

‎src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
+16-29Lines changed: 16 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
use Symfony\Component\Console\Output\OutputInterface;
2222
use Symfony\Component\Console\Style\SymfonyStyle;
2323
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
24+
use Symfony\Component\Messenger\RoutableMessageBus;
2425
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMemoryUsageIsExceededReceiver;
2526
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMessageCountIsExceededReceiver;
2627
use Symfony\Component\Messenger\Transport\Receiver\StopWhenTimeLimitIsReachedReceiver;
@@ -39,17 +40,15 @@ class ConsumeMessagesCommand extends Command
3940
private $receiverLocator;
4041
private $logger;
4142
private $receiverNames;
42-
private $busNames;
4343
private $retryStrategyLocator;
4444
private $eventDispatcher;
4545

46-
public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], array $busNames = [], ContainerInterface $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null)
46+
public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], ContainerInterface $retryStrategyLocator = null, EventDispatcherInterface $eventDispatcher = null)
4747
{
4848
$this->busLocator = $busLocator;
4949
$this->receiverLocator = $receiverLocator;
5050
$this->logger = $logger;
5151
$this->receiverNames = $receiverNames;
52-
$this->busNames = $busNames;
5352
$this->retryStrategyLocator = $retryStrategyLocator;
5453
$this->eventDispatcher = $eventDispatcher;
5554

@@ -62,15 +61,14 @@ public function __construct(ContainerInterface $busLocator, ContainerInterface $
6261
protected function configure(): void
6362
{
6463
$defaultReceiverName = 1 === \count($this->receiverNames) ? current($this->receiverNames) : null;
65-
$defaultBusName = 1 === \count($this->busNames) ? current($this->busNames) : null;
6664

6765
$this
6866
->setDefinition([
6967
new InputArgument('receiver', $defaultReceiverName ? InputArgument::OPTIONAL : InputArgument::REQUIRED, 'Name of the receiver', $defaultReceiverName),
7068
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
7169
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
7270
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'),
73-
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched', $defaultBusName),
71+
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically.'),
7472
])
7573
->setDescription('Consumes messages')
7674
->setHelp(<<<'EOF'
@@ -89,6 +87,12 @@ protected function configure(): void
8987
Use the --time-limit option to stop the worker when the given time limit (in seconds) is reached:
9088
9189
<info>php %command.full_name% <receiver-name> --time-limit=3600</info>
90+
91+
Use the --bus option to specify the message bus to dispatch received messages
92+
to instead of trying to determine it automatically. This is required if the
93+
messages didn't originate from Messenger:
94+
95+
<info>php %command.full_name% <receiver-name> --bus=event_bus</info>
9296
EOF
9397
)
9498
;
@@ -112,24 +116,6 @@ protected function interact(InputInterface $input, OutputInterface $output)
112116
}
113117
}
114118
}
115-
116-
$busName = $input->getOption('bus');
117-
if ($this->busNames && !$this->busLocator->has($busName)) {
118-
if (null === $busName) {
119-
$io->block('Missing bus argument.', null, 'error', ' ', true);
120-
$input->setOption('bus', $io->choice('Select one of the available buses', $this->busNames));
121-
} elseif ($alternatives = $this->findAlternatives($busName, $this->busNames)) {
122-
$io->block(sprintf('Bus "%s" is not defined.', $busName), null, 'error', ' ', true);
123-
124-
if (1 === \count($alternatives)) {
125-
if ($io->confirm(sprintf('Do you want to dispatch to "%s" instead? ', $alternatives[0]), true)) {
126-
$input->setOption('bus', $alternatives[0]);
127-
}
128-
} else {
129-
$input->setOption('bus', $io->choice('Did you mean one of the following buses instead?', $alternatives, $alternatives[0]));
130-
}
131-
}
132-
}
133119
}
134120

135121
/**
@@ -147,18 +133,19 @@ protected function execute(InputInterface $input, OutputInterface $output): void
147133
throw new RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName));
148134
}
149135

150-
if (!$this->busLocator->has($busName = $input->getOption('bus'))) {
151-
throw new RuntimeException(sprintf('Bus "%s" does not exist.', $busName));
152-
}
153-
154136
if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) {
155137
throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName));
156138
}
157139

158140
$receiver = $this->receiverLocator->get($receiverName);
159-
$bus = $this->busLocator->get($busName);
160141
$retryStrategy = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
161142

143+
if (null !== $input->getOption('bus')) {
144+
$bus = $this->busLocator->get($input->getOption('bus'));
145+
} else {
146+
$bus = new RoutableMessageBus($this->busLocator);
147+
}
148+
162149
$stopsWhen = [];
163150
if ($limit = $input->getOption('limit')) {
164151
$stopsWhen[] = "processed {$limit} messages";
@@ -176,7 +163,7 @@ protected function execute(InputInterface $input, OutputInterface $output): void
176163
}
177164

178165
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
179-
$io->success(sprintf('Consuming messages from transport "%s" on bus "%s".', $receiverName, $busName));
166+
$io->success(sprintf('Consuming messages from transport "%s".', $receiverName));
180167

181168
if ($stopsWhen) {
182169
$last = array_pop($stopsWhen);

‎src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php
+1-2Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -248,8 +248,7 @@ private function registerReceivers(ContainerBuilder $container, array $busIds)
248248

249249
$container->getDefinition('console.command.messenger_consume_messages')
250250
->replaceArgument(0, ServiceLocatorTagPass::register($container, $buses))
251-
->replaceArgument(3, array_values($receiverNames))
252-
->replaceArgument(4, $busIds);
251+
->replaceArgument(3, array_values($receiverNames));
253252
}
254253

255254
$container->getDefinition('messenger.receiver_locator')->replaceArgument(0, $receiverMapping);
+39Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Middleware;
13+
14+
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Stamp\BusNameStamp;
16+
17+
/**
18+
* Adds the BusNameStamp to the bus.
19+
*
20+
* @experimental in 4.3
21+
*
22+
* @author Ryan Weaver <ryan@symfonycasts.com>
23+
*/
24+
class AddBusNameStampMiddleware implements MiddlewareInterface
25+
{
26+
private $busName;
27+
28+
public function __construct(string $busName)
29+
{
30+
$this->busName = $busName;
31+
}
32+
33+
public function handle(Envelope $envelope, StackInterface $stack): Envelope
34+
{
35+
$envelope = $envelope->with(new BusNameStamp($this->busName));
36+
37+
return $stack->next()->handle($envelope, $stack);
38+
}
39+
}
+58Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger;
13+
14+
use Psr\Container\ContainerInterface;
15+
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
16+
use Symfony\Component\Messenger\Stamp\BusNameStamp;
17+
18+
/**
19+
* Bus of buses that is routable using a BusNameStamp.
20+
*
21+
* This is useful when passed to Worker: messages received
22+
* from the transport can be sent to the correct bus.
23+
*
24+
* @experimental in 4.3
25+
*
26+
* @author Ryan Weaver <ryan@symfonycasts.com>
27+
*/
28+
class RoutableMessageBus implements MessageBusInterface
29+
{
30+
private $busLocator;
31+
32+
/**
33+
* @param ContainerInterface $busLocator A locator full of MessageBusInterface objects
34+
*/
35+
public function __construct(ContainerInterface $busLocator)
36+
{
37+
$this->busLocator = $busLocator;
38+
}
39+
40+
public function dispatch($envelope): Envelope
41+
{
42+
if (!$envelope instanceof Envelope) {
43+
throw new InvalidArgumentException('Messages passed to RoutableMessageBus::dispatch() must be inside an Envelope');
44+
}
45+
46+
/** @var BusNameStamp $busNameStamp */
47+
$busNameStamp = $envelope->last(BusNameStamp::class);
48+
if (null === $busNameStamp) {
49+
throw new InvalidArgumentException('Envelope does not contain a BusNameStamp.');
50+
}
51+
52+
if (!$this->busLocator->has($busNameStamp->getBusName())) {
53+
throw new InvalidArgumentException(sprintf('Invalid bus name "%s" on BusNameStamp.', $busNameStamp->getBusName()));
54+
}
55+
56+
return $this->busLocator->get($busNameStamp->getBusName())->dispatch($envelope);
57+
}
58+
}
+34Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Stamp;
13+
14+
/**
15+
* Stamp used to identify which bus it was passed to.
16+
*
17+
* @experimental in 4.3
18+
*
19+
* @author Ryan Weaver <ryan@symfonycasts.com>
20+
*/
21+
class BusNameStamp implements StampInterface
22+
{
23+
private $busName;
24+
25+
public function __construct(string $busName)
26+
{
27+
$this->busName = $busName;
28+
}
29+
30+
public function getBusName(): string
31+
{
32+
return $this->busName;
33+
}
34+
}

‎src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php
-1Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,6 @@ public function testItRegistersMultipleReceiversAndSetsTheReceiverNamesOnTheComm
263263
(new MessengerPass())->process($container);
264264

265265
$this->assertSame(['amqp', 'dummy'], $container->getDefinition('console.command.messenger_consume_messages')->getArgument(3));
266-
$this->assertSame(['message_bus'], $container->getDefinition('console.command.messenger_consume_messages')->getArgument(4));
267266
}
268267

269268
public function testItShouldNotThrowIfGeneratorIsReturnedInsteadOfArray()
+33Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Middleware;
13+
14+
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Middleware\AddBusNameStampMiddleware;
16+
use Symfony\Component\Messenger\Stamp\BusNameStamp;
17+
use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase;
18+
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
19+
20+
class AddBusNameStampMiddlewareTest extends MiddlewareTestCase
21+
{
22+
public function testItSendsTheMessageToAssignedSender()
23+
{
24+
$middleware = new AddBusNameStampMiddleware('the_bus_name');
25+
$envelope = new Envelope(new DummyMessage('the message'));
26+
27+
$finalEnvelope = $middleware->handle($envelope, $this->getStackMock());
28+
/** @var BusNameStamp $busNameStamp */
29+
$busNameStamp = $finalEnvelope->last(BusNameStamp::class);
30+
$this->assertNotNull($busNameStamp);
31+
$this->assertSame('the_bus_name', $busNameStamp->getBusName());
32+
}
33+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.