Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit a4f9bf3

Browse filesBrowse files
committed
Fixing a bug where a transport could receive a message and dispatch it to a different bus
1 parent 522594a commit a4f9bf3
Copy full SHA for a4f9bf3

File tree

11 files changed

+269
-34
lines changed
Filter options

11 files changed

+269
-34
lines changed

‎src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
+12-2Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1616,8 +1616,14 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
16161616
}
16171617

16181618
$defaultMiddleware = [
1619-
'before' => [['id' => 'dispatch_after_current_bus']],
1620-
'after' => [['id' => 'send_message'], ['id' => 'handle_message']],
1619+
'before' => [
1620+
['id' => 'add_bus_name_stamp_middleware'],
1621+
['id' => 'dispatch_after_current_bus'],
1622+
],
1623+
'after' => [
1624+
['id' => 'send_message'],
1625+
['id' => 'handle_message'],
1626+
],
16211627
];
16221628
foreach ($config['buses'] as $busId => $bus) {
16231629
$middleware = $bus['middleware'];
@@ -1628,6 +1634,10 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
16281634
} else {
16291635
unset($defaultMiddleware['after'][1]['arguments']);
16301636
}
1637+
1638+
// argument to add_bus_name_stamp_middleware
1639+
$defaultMiddleware['before'][0]['arguments'] = [$busId];
1640+
16311641
$middleware = array_merge($defaultMiddleware['before'], $middleware, $defaultMiddleware['after']);
16321642
}
16331643

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
+2Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
</call>
4040
</service>
4141

42+
<service id="messenger.middleware.add_bus_name_stamp_middleware" class="Symfony\Component\Messenger\Middleware\AddBusNameStampMiddleware" abstract="true" />
43+
4244
<service id="messenger.middleware.dispatch_after_current_bus" class="Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware" />
4345

4446
<service id="messenger.middleware.validation" class="Symfony\Component\Messenger\Middleware\ValidationMiddleware">

‎src/Symfony/Component/Messenger/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/CHANGELOG.md
+5Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@ CHANGELOG
44
4.3.0
55
-----
66

7+
* New classes: `RoutableMessageBus`, `AddBusNameStampMiddleware`
8+
and `BusNameStamp` were added, which allow you to add a bus identifier
9+
to the `Envelope` then find the correct bus when receiving from
10+
the transport. See `ConsumeMessagesCommand`.
11+
* An optional `ConsumeMessagesCommand` constructor argument was removed.
712
* Added `PhpSerializer` which uses PHP's native `serialize()` and
813
`unserialize()` to serialize messages to a transport
914
* [BC BREAK] If no serializer were passed, the default serializer

‎src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
+16-29Lines changed: 16 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
use Symfony\Component\Console\Input\InputOption;
2121
use Symfony\Component\Console\Output\OutputInterface;
2222
use Symfony\Component\Console\Style\SymfonyStyle;
23+
use Symfony\Component\Messenger\RoutableMessageBus;
2324
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMemoryUsageIsExceededReceiver;
2425
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMessageCountIsExceededReceiver;
2526
use Symfony\Component\Messenger\Transport\Receiver\StopWhenTimeLimitIsReachedReceiver;
@@ -38,15 +39,13 @@ class ConsumeMessagesCommand extends Command
3839
private $receiverLocator;
3940
private $logger;
4041
private $receiverNames;
41-
private $busNames;
4242

43-
public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [], array $busNames = [])
43+
public function __construct(ContainerInterface $busLocator, ContainerInterface $receiverLocator, LoggerInterface $logger = null, array $receiverNames = [])
4444
{
4545
$this->busLocator = $busLocator;
4646
$this->receiverLocator = $receiverLocator;
4747
$this->logger = $logger;
4848
$this->receiverNames = $receiverNames;
49-
$this->busNames = $busNames;
5049

5150
parent::__construct();
5251
}
@@ -57,15 +56,14 @@ public function __construct(ContainerInterface $busLocator, ContainerInterface $
5756
protected function configure(): void
5857
{
5958
$defaultReceiverName = 1 === \count($this->receiverNames) ? current($this->receiverNames) : null;
60-
$defaultBusName = 1 === \count($this->busNames) ? current($this->busNames) : null;
6159

6260
$this
6361
->setDefinition([
6462
new InputArgument('receiver', $defaultReceiverName ? InputArgument::OPTIONAL : InputArgument::REQUIRED, 'Name of the receiver', $defaultReceiverName),
6563
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
6664
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
6765
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'),
68-
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched', $defaultBusName),
66+
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically.'),
6967
])
7068
->setDescription('Consumes messages')
7169
->setHelp(<<<'EOF'
@@ -84,6 +82,12 @@ protected function configure(): void
8482
Use the --time-limit option to stop the worker when the given time limit (in seconds) is reached:
8583
8684
<info>php %command.full_name% <receiver-name> --time-limit=3600</info>
85+
86+
Use the --bus option to specify the message bus to dispatch received messages
87+
to instead of trying to determine it automatically. This is required if the
88+
messages didn't originate from Messenger:
89+
90+
<info>php %command.full_name% <receiver-name> --bus=event_bus</info>
8791
EOF
8892
)
8993
;
@@ -107,24 +111,6 @@ protected function interact(InputInterface $input, OutputInterface $output)
107111
}
108112
}
109113
}
110-
111-
$busName = $input->getOption('bus');
112-
if ($this->busNames && !$this->busLocator->has($busName)) {
113-
if (null === $busName) {
114-
$io->block('Missing bus argument.', null, 'error', ' ', true);
115-
$input->setOption('bus', $io->choice('Select one of the available buses', $this->busNames));
116-
} elseif ($alternatives = $this->findAlternatives($busName, $this->busNames)) {
117-
$io->block(sprintf('Bus "%s" is not defined.', $busName), null, 'error', ' ', true);
118-
119-
if (1 === \count($alternatives)) {
120-
if ($io->confirm(sprintf('Do you want to dispatch to "%s" instead? ', $alternatives[0]), true)) {
121-
$input->setOption('bus', $alternatives[0]);
122-
}
123-
} else {
124-
$input->setOption('bus', $io->choice('Did you mean one of the following buses instead?', $alternatives, $alternatives[0]));
125-
}
126-
}
127-
}
128114
}
129115

130116
/**
@@ -136,12 +122,13 @@ protected function execute(InputInterface $input, OutputInterface $output): void
136122
throw new RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName));
137123
}
138124

139-
if (!$this->busLocator->has($busName = $input->getOption('bus'))) {
140-
throw new RuntimeException(sprintf('Bus "%s" does not exist.', $busName));
141-
}
142-
143125
$receiver = $this->receiverLocator->get($receiverName);
144-
$bus = $this->busLocator->get($busName);
126+
127+
if (null !== $input->getOption('bus')) {
128+
$bus = $this->busLocator->get($input->getOption('bus'));
129+
} else {
130+
$bus = new RoutableMessageBus($this->busLocator);
131+
}
145132

146133
$stopsWhen = [];
147134
if ($limit = $input->getOption('limit')) {
@@ -160,7 +147,7 @@ protected function execute(InputInterface $input, OutputInterface $output): void
160147
}
161148

162149
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
163-
$io->success(sprintf('Consuming messages from transport "%s" on bus "%s".', $receiverName, $busName));
150+
$io->success(sprintf('Consuming messages from transport "%s".', $receiverName));
164151

165152
if ($stopsWhen) {
166153
$last = array_pop($stopsWhen);

‎src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php
+1-2Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -248,8 +248,7 @@ private function registerReceivers(ContainerBuilder $container, array $busIds)
248248

249249
$container->getDefinition('console.command.messenger_consume_messages')
250250
->replaceArgument(0, ServiceLocatorTagPass::register($container, $buses))
251-
->replaceArgument(3, array_values($receiverNames))
252-
->replaceArgument(4, $busIds);
251+
->replaceArgument(3, array_values($receiverNames));
253252
}
254253

255254
$container->getDefinition('messenger.receiver_locator')->replaceArgument(0, $receiverMapping);
+39Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Middleware;
13+
14+
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Stamp\BusNameStamp;
16+
17+
/**
18+
* Adds the BusNameStamp to the bus.
19+
*
20+
* @experimental in Symfony 4.2
21+
*
22+
* @author Ryan Weaver <ryan@symfonycasts.com>
23+
*/
24+
class AddBusNameStampMiddleware implements MiddlewareInterface
25+
{
26+
private $busName;
27+
28+
public function __construct(string $busName)
29+
{
30+
$this->busName = $busName;
31+
}
32+
33+
public function handle(Envelope $envelope, StackInterface $stack): Envelope
34+
{
35+
$envelope = $envelope->with(new BusNameStamp($this->busName));
36+
37+
return $stack->next()->handle($envelope, $stack);
38+
}
39+
}
+58Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger;
13+
14+
use Psr\Container\ContainerInterface;
15+
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
16+
use Symfony\Component\Messenger\Stamp\BusNameStamp;
17+
18+
/**
19+
* Bus of buses that is routable using a BusNameStamp.
20+
*
21+
* This is useful when passed to Worker: messages received
22+
* from the transport can be sent to the correct bus.
23+
*
24+
* @experimental in Symfony 4.2
25+
*
26+
* @author Ryan Weaver <ryan@symfonycasts.com>
27+
*/
28+
class RoutableMessageBus implements MessageBusInterface
29+
{
30+
private $busLocator;
31+
32+
/**
33+
* @param ContainerInterface $busLocator A locator full of MessageBusInterface objects
34+
*/
35+
public function __construct(ContainerInterface $busLocator)
36+
{
37+
$this->busLocator = $busLocator;
38+
}
39+
40+
public function dispatch($envelope): Envelope
41+
{
42+
if (!$envelope instanceof Envelope) {
43+
throw new InvalidArgumentException('Messages passed to RoutableMessageBus::dispatch() must be inside an Envelope');
44+
}
45+
46+
/** @var BusNameStamp $busNameStamp */
47+
$busNameStamp = $envelope->last(BusNameStamp::class);
48+
if (null === $busNameStamp) {
49+
throw new InvalidArgumentException('Envelope does not contain a BusNameStamp.');
50+
}
51+
52+
if (!$this->busLocator->has($busNameStamp->getBusName())) {
53+
throw new InvalidArgumentException(sprintf('Invalid bus name "%s" on BusNameStamp.', $busNameStamp->getBusName()));
54+
}
55+
56+
return $this->busLocator->get($busNameStamp->getBusName())->dispatch($envelope);
57+
}
58+
}
+34Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Stamp;
13+
14+
/**
15+
* Stamp used to identify which bus it was passed to.
16+
*
17+
* @experimental in Symfony 4.2
18+
*
19+
* @author Ryan Weaver <ryan@symfonycasts.com>
20+
*/
21+
class BusNameStamp implements StampInterface
22+
{
23+
private $busName;
24+
25+
public function __construct(string $busName)
26+
{
27+
$this->busName = $busName;
28+
}
29+
30+
public function getBusName(): string
31+
{
32+
return $this->busName;
33+
}
34+
}

‎src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php
-1Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,6 @@ public function testItRegistersMultipleReceiversAndSetsTheReceiverNamesOnTheComm
262262
(new MessengerPass())->process($container);
263263

264264
$this->assertSame(['amqp', 'dummy'], $container->getDefinition('console.command.messenger_consume_messages')->getArgument(3));
265-
$this->assertSame(['message_bus'], $container->getDefinition('console.command.messenger_consume_messages')->getArgument(4));
266265
}
267266

268267
public function testItShouldNotThrowIfGeneratorIsReturnedInsteadOfArray()
+33Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Middleware;
13+
14+
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Middleware\AddBusNameStampMiddleware;
16+
use Symfony\Component\Messenger\Stamp\BusNameStamp;
17+
use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase;
18+
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
19+
20+
class AddBusNameStampMiddlewareTest extends MiddlewareTestCase
21+
{
22+
public function testItSendsTheMessageToAssignedSender()
23+
{
24+
$middleware = new AddBusNameStampMiddleware('the_bus_name');
25+
$envelope = new Envelope(new DummyMessage('the message'));
26+
27+
$finalEnvelope = $middleware->handle($envelope, $this->getStackMock());
28+
/** @var BusNameStamp $busNameStamp */
29+
$busNameStamp = $finalEnvelope->last(BusNameStamp::class);
30+
$this->assertNotNull($busNameStamp);
31+
$this->assertSame('the_bus_name', $busNameStamp->getBusName());
32+
}
33+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.