Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit e157ded

Browse filesBrowse files
committed
feature #24411 [Messenger] Add a new Messenger component (sroze)
This PR was squashed before being merged into the 4.1-dev branch (closes #24411). Discussion ---------- [Messenger] Add a new Messenger component | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | BC breaks? | no | Deprecations? | no | Tests pass? | yes | Fixed tickets | #24326 | License | MIT | Doc PR | symfony/symfony-docs#9437 As discussed in #24326. This PR is to help going forward with the discussions of having a Message component. # Resources | What | Where | --- | --- | Documentation | [In the PR](https://github.com/sroze/symfony/blob/add-message-component/src/Symfony/Component/Message/README.md) | Demo | [In `sroze/symfony-demo:message-component-demo`](https://github.com/sroze/symfony-demo/compare/message-component-demo) | [php-enqueue](https://github.com/php-enqueue/enqueue-dev) adapter | 1. Source: [In `sroze/enqueue-bridge`](https://github.com/sroze/enqueue-bridge) _(to be moved as `symfony/enqueue-bridge` I guess)_<br/>2. Demo: [In `sroze/symfony-demo:message-component-demo-with-enqueue`](sroze/symfony-demo@message-component-demo...sroze:message-component-demo-with-enqueue) | [Swarrot](https://github.com/swarrot/swarrot) adapter | **Outdated adapter, waiting for stabilization** 1. Source: [In `sroze/swarrot-bridge`](https://github.com/sroze/swarrot-bridge) _(to be moved as `symfony/swarrot-bridge` I guess)_<br/>2. Demo: [In `sroze/symfony-demo:message-component-demo-with-swarrot`](sroze/symfony-demo@message-component-demo...sroze:message-component-demo-with-swarrot) | [HTTP](https://github.com/sroze/message-http-adapter) adapter | **Outdated adapter, waiting for stabilization** 1. Source: [In `sroze/message-http-adapter`](https://github.com/sroze/message-http-adapter) <br/>2. Demo: [In `sroze/symfony-demo:message-component-demo-with-http-adapter`](sroze/symfony-demo@message-component-demo...sroze:message-component-demo-with-http-adapter) | Web profiler integration | _In the pull-request_ # Important points 1. **Tests are not in the PR as they were written in PhpSpec & Behat.** If we decide to go forward with this approach, I'll translate them to PHPUnit. 2. The aim is not to solve all the message/queuing problems but provide a good, simple and extensible message bus for developers. 3. The communication with the actual AMQP/API brokers is down to the adapters for now. Not sure if we need to ship some by default or not 🤔 I guess that this would replace #23842 & #23315. # Changes from the proposals Based on the comments, a few changes have been made from the proposal. 1. `MessageProducer`s have been renamed to `MessageSender`s 2. `MessageConsumer`s have been renamed to `MessageReceiver`s Commits ------- c9cfda9 [Messenger] Add a new Messenger component
2 parents acf49e9 + c9cfda9 commit e157ded
Copy full SHA for e157ded

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Dismiss banner

48 files changed

+1967
-0
lines changed

‎composer.json

Copy file name to clipboardExpand all lines: composer.json
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
"symfony/intl": "self.version",
5656
"symfony/ldap": "self.version",
5757
"symfony/lock": "self.version",
58+
"symfony/messenger": "self.version",
5859
"symfony/monolog-bridge": "self.version",
5960
"symfony/options-resolver": "self.version",
6061
"symfony/process": "self.version",
+87Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Bundle\FrameworkBundle\Command;
13+
14+
use Psr\Container\ContainerInterface;
15+
use Symfony\Component\Console\Command\Command;
16+
use Symfony\Component\Console\Input\InputArgument;
17+
use Symfony\Component\Console\Input\InputInterface;
18+
use Symfony\Component\Console\Input\InputOption;
19+
use Symfony\Component\Console\Output\OutputInterface;
20+
use Symfony\Component\Messenger\MessageBusInterface;
21+
use Symfony\Component\Messenger\Transport\Enhancers\MaximumCountReceiver;
22+
use Symfony\Component\Messenger\Transport\ReceiverInterface;
23+
use Symfony\Component\Messenger\Worker;
24+
25+
/**
26+
* @author Samuel Roze <samuel.roze@gmail.com>
27+
*/
28+
class MessengerConsumeMessagesCommand extends Command
29+
{
30+
protected static $defaultName = 'messenger:consume-messages';
31+
32+
private $bus;
33+
private $receiverLocator;
34+
35+
public function __construct(MessageBusInterface $bus, ContainerInterface $receiverLocator)
36+
{
37+
parent::__construct();
38+
39+
$this->bus = $bus;
40+
$this->receiverLocator = $receiverLocator;
41+
}
42+
43+
/**
44+
* {@inheritdoc}
45+
*/
46+
protected function configure()
47+
{
48+
$this
49+
->setDefinition(array(
50+
new InputArgument('receiver', InputArgument::REQUIRED, 'Name of the receiver'),
51+
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
52+
))
53+
->setDescription('Consumes messages')
54+
->setHelp(<<<'EOF'
55+
The <info>%command.name%</info> command consumes messages and dispatches them to the message bus.
56+
57+
<info>php %command.full_name% <receiver-name></info>
58+
59+
Use the --limit option to limit the number of messages received:
60+
61+
<info>php %command.full_name% <receiver-name> --limit=10</info>
62+
EOF
63+
)
64+
;
65+
}
66+
67+
/**
68+
* {@inheritdoc}
69+
*/
70+
protected function execute(InputInterface $input, OutputInterface $output)
71+
{
72+
if (!$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
73+
throw new \RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName));
74+
}
75+
76+
if (!($receiver = $this->receiverLocator->get($receiverName)) instanceof ReceiverInterface) {
77+
throw new \RuntimeException(sprintf('Receiver "%s" is not a valid message consumer. It must implement the "%s" interface.', $receiverName, ReceiverInterface::class));
78+
}
79+
80+
if ($limit = $input->getOption('limit')) {
81+
$receiver = new MaximumCountReceiver($receiver, $limit);
82+
}
83+
84+
$worker = new Worker($receiver, $this->bus);
85+
$worker->run();
86+
}
87+
}

‎src/Symfony/Bundle/FrameworkBundle/Controller/AbstractController.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Controller/AbstractController.php
+2Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use Symfony\Component\HttpFoundation\RequestStack;
1919
use Symfony\Component\HttpFoundation\Session\SessionInterface;
2020
use Symfony\Component\HttpKernel\HttpKernelInterface;
21+
use Symfony\Component\Messenger\MessageBusInterface;
2122
use Symfony\Component\Routing\RouterInterface;
2223
use Symfony\Component\Security\Core\Authentication\Token\Storage\TokenStorageInterface;
2324
use Symfony\Component\Security\Core\Authorization\AuthorizationCheckerInterface;
@@ -84,6 +85,7 @@ public static function getSubscribedServices()
8485
'security.token_storage' => '?'.TokenStorageInterface::class,
8586
'security.csrf.token_manager' => '?'.CsrfTokenManagerInterface::class,
8687
'parameter_bag' => '?'.ContainerInterface::class,
88+
'message_bus' => '?'.MessageBusInterface::class,
8789
);
8890
}
8991
}

‎src/Symfony/Bundle/FrameworkBundle/Controller/ControllerTrait.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Controller/ControllerTrait.php
+16Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,4 +382,20 @@ protected function isCsrfTokenValid(string $id, ?string $token): bool
382382

383383
return $this->container->get('security.csrf.token_manager')->isTokenValid(new CsrfToken($id, $token));
384384
}
385+
386+
/**
387+
* Dispatches a message to the bus.
388+
*
389+
* @param object $message The message to dispatch
390+
*
391+
* @final
392+
*/
393+
protected function dispatchMessage($message)
394+
{
395+
if (!$this->container->has('message_bus')) {
396+
throw new \LogicException('The message bus is not enabled in your application. Try running "composer require symfony/messenger".');
397+
}
398+
399+
return $this->container->get('message_bus')->dispatch($message);
400+
}
385401
}
+93Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Bundle\FrameworkBundle\DataCollector;
13+
14+
use Symfony\Component\HttpFoundation\Request;
15+
use Symfony\Component\HttpFoundation\Response;
16+
use Symfony\Component\HttpKernel\DataCollector\DataCollector;
17+
use Symfony\Component\Messenger\MiddlewareInterface;
18+
19+
/**
20+
* @author Samuel Roze <samuel.roze@gmail.com>
21+
*/
22+
class MessengerDataCollector extends DataCollector implements MiddlewareInterface
23+
{
24+
/**
25+
* {@inheritdoc}
26+
*/
27+
public function collect(Request $request, Response $response, \Exception $exception = null)
28+
{
29+
return $this->data;
30+
}
31+
32+
/**
33+
* {@inheritdoc}
34+
*/
35+
public function getName()
36+
{
37+
return 'messages';
38+
}
39+
40+
/**
41+
* {@inheritdoc}
42+
*/
43+
public function reset()
44+
{
45+
$this->data = array();
46+
}
47+
48+
/**
49+
* {@inheritdoc}
50+
*/
51+
public function handle($message, callable $next)
52+
{
53+
$debugRepresentation = array(
54+
'message' => array(
55+
'type' => get_class($message),
56+
),
57+
);
58+
59+
$exception = null;
60+
try {
61+
$result = $next($message);
62+
63+
if (is_object($result)) {
64+
$debugRepresentation['result'] = array(
65+
'type' => get_class($result),
66+
);
67+
} else {
68+
$debugRepresentation['result'] = array(
69+
'type' => gettype($result),
70+
'value' => $result,
71+
);
72+
}
73+
} catch (\Throwable $exception) {
74+
$debugRepresentation['exception'] = array(
75+
'type' => get_class($exception),
76+
'message' => $exception->getMessage(),
77+
);
78+
}
79+
80+
$this->data[] = $debugRepresentation;
81+
82+
if (null !== $exception) {
83+
throw $exception;
84+
}
85+
86+
return $result;
87+
}
88+
89+
public function getMessages(): array
90+
{
91+
return $this->data;
92+
}
93+
}

‎src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php
+33Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
use Symfony\Component\Lock\Lock;
2323
use Symfony\Component\Lock\Store\SemaphoreStore;
2424
use Symfony\Component\Security\Csrf\CsrfTokenManagerInterface;
25+
use Symfony\Component\Messenger\MessageBusInterface;
2526
use Symfony\Component\Serializer\Serializer;
2627
use Symfony\Component\Translation\Translator;
2728
use Symfony\Component\Validator\Validation;
@@ -102,6 +103,7 @@ public function getConfigTreeBuilder()
102103
$this->addPhpErrorsSection($rootNode);
103104
$this->addWebLinkSection($rootNode);
104105
$this->addLockSection($rootNode);
106+
$this->addMessengerSection($rootNode);
105107

106108
return $treeBuilder;
107109
}
@@ -956,4 +958,35 @@ private function addWebLinkSection(ArrayNodeDefinition $rootNode)
956958
->end()
957959
;
958960
}
961+
962+
private function addMessengerSection(ArrayNodeDefinition $rootNode)
963+
{
964+
$rootNode
965+
->children()
966+
->arrayNode('messenger')
967+
->info('Messenger configuration')
968+
->{!class_exists(FullStack::class) && class_exists(MessageBusInterface::class) ? 'canBeDisabled' : 'canBeEnabled'}()
969+
->children()
970+
->arrayNode('routing')
971+
->useAttributeAsKey('message_class')
972+
->prototype('array')
973+
->beforeNormalization()
974+
->ifString()
975+
->then(function ($v) {
976+
return array('senders' => array($v));
977+
})
978+
->end()
979+
->children()
980+
->arrayNode('senders')
981+
->requiresAtLeastOneElement()
982+
->prototype('scalar')->end()
983+
->end()
984+
->end()
985+
->end()
986+
->end()
987+
->end()
988+
->end()
989+
->end()
990+
;
991+
}
959992
}

‎src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
+32Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@
5959
use Symfony\Component\Lock\LockInterface;
6060
use Symfony\Component\Lock\Store\StoreFactory;
6161
use Symfony\Component\Lock\StoreInterface;
62+
use Symfony\Component\Messenger\Transport\ReceiverInterface;
63+
use Symfony\Component\Messenger\Transport\SenderInterface;
6264
use Symfony\Component\PropertyAccess\PropertyAccessor;
6365
use Symfony\Component\PropertyInfo\PropertyAccessExtractorInterface;
6466
use Symfony\Component\PropertyInfo\PropertyDescriptionExtractorInterface;
@@ -267,6 +269,12 @@ public function load(array $configs, ContainerBuilder $container)
267269
$this->registerLockConfiguration($config['lock'], $container, $loader);
268270
}
269271

272+
if ($this->isConfigEnabled($container, $config['messenger'])) {
273+
$this->registerMessengerConfiguration($config['messenger'], $container, $loader);
274+
} else {
275+
$container->removeDefinition('console.command.messenger_consume_messages');
276+
}
277+
270278
if ($this->isConfigEnabled($container, $config['web_link'])) {
271279
if (!class_exists(HttpHeaderSerializer::class)) {
272280
throw new LogicException('WebLink support cannot be enabled as the WebLink component is not installed.');
@@ -334,6 +342,10 @@ public function load(array $configs, ContainerBuilder $container)
334342
->addTag('validator.constraint_validator');
335343
$container->registerForAutoconfiguration(ObjectInitializerInterface::class)
336344
->addTag('validator.initializer');
345+
$container->registerForAutoconfiguration(ReceiverInterface::class)
346+
->addTag('messenger.receiver');
347+
$container->registerForAutoconfiguration(SenderInterface::class)
348+
->addTag('messenger.sender');
337349

338350
if (!$container->getParameter('kernel.debug')) {
339351
// remove tagged iterator argument for resource checkers
@@ -1415,6 +1427,26 @@ private function registerLockConfiguration(array $config, ContainerBuilder $cont
14151427
}
14161428
}
14171429

1430+
private function registerMessengerConfiguration(array $config, ContainerBuilder $container, XmlFileLoader $loader)
1431+
{
1432+
$loader->load('messenger.xml');
1433+
1434+
$senderLocatorMapping = array();
1435+
$messageToSenderIdsMapping = array();
1436+
foreach ($config['routing'] as $message => $messageConfiguration) {
1437+
foreach ($messageConfiguration['senders'] as $sender) {
1438+
if (null !== $sender) {
1439+
$senderLocatorMapping[$sender] = new Reference($sender);
1440+
}
1441+
}
1442+
1443+
$messageToSenderIdsMapping[$message] = $messageConfiguration['senders'];
1444+
}
1445+
1446+
$container->getDefinition('messenger.sender_locator')->replaceArgument(0, $senderLocatorMapping);
1447+
$container->getDefinition('messenger.asynchronous.routing.sender_locator')->replaceArgument(1, $messageToSenderIdsMapping);
1448+
}
1449+
14181450
private function registerCacheConfiguration(array $config, ContainerBuilder $container)
14191451
{
14201452
$version = new Parameter('container.build_id');

‎src/Symfony/Bundle/FrameworkBundle/FrameworkBundle.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/FrameworkBundle.php
+2Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
use Symfony\Component\HttpKernel\DependencyInjection\RegisterControllerArgumentLocatorsPass;
3535
use Symfony\Component\HttpKernel\DependencyInjection\RemoveEmptyControllerArgumentLocatorsPass;
3636
use Symfony\Component\HttpKernel\DependencyInjection\ResettableServicePass;
37+
use Symfony\Component\Messenger\DependencyInjection\MessengerPass;
3738
use Symfony\Component\PropertyInfo\DependencyInjection\PropertyInfoPass;
3839
use Symfony\Component\Routing\DependencyInjection\RoutingResolverPass;
3940
use Symfony\Component\Serializer\DependencyInjection\SerializerPass;
@@ -118,6 +119,7 @@ public function build(ContainerBuilder $container)
118119
$container->addCompilerPass(new ResettableServicePass());
119120
$container->addCompilerPass(new TestServiceContainerWeakRefPass(), PassConfig::TYPE_BEFORE_REMOVING, -32);
120121
$container->addCompilerPass(new TestServiceContainerRealRefPass(), PassConfig::TYPE_AFTER_REMOVING);
122+
$this->addCompilerPassIfExists($container, MessengerPass::class);
121123

122124
if ($container->getParameter('kernel.debug')) {
123125
$container->addCompilerPass(new AddDebugLogProcessorPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, -32);

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml
+7Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,13 @@
6969
<tag name="console.command" command="debug:event-dispatcher" />
7070
</service>
7171

72+
<service id="console.command.messenger_consume_messages" class="Symfony\Bundle\FrameworkBundle\Command\MessengerConsumeMessagesCommand">
73+
<argument type="service" id="message_bus" />
74+
<argument type="service" id="messenger.receiver_locator" />
75+
76+
<tag name="console.command" command="messenger:consume-messages" />
77+
</service>
78+
7279
<service id="console.command.router_debug" class="Symfony\Bundle\FrameworkBundle\Command\RouterDebugCommand">
7380
<argument type="service" id="router" />
7481
<tag name="console.command" command="debug:router" />

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.