Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit cbaae5d

Browse filesBrowse files
author
Peter van der Wal
committed
[Messenger] Prioritize receivers via transport configuration
1 parent a980a46 commit cbaae5d
Copy full SHA for cbaae5d

File tree

Expand file treeCollapse file tree

11 files changed

+46
-4
lines changed
Filter options
Expand file treeCollapse file tree

11 files changed

+46
-4
lines changed

‎src/Symfony/Bundle/FrameworkBundle/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/CHANGELOG.md
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ CHANGELOG
99
* Make the `config/` directory optional in `MicroKernelTrait`, add support for service arguments in the
1010
invokable Kernel class, and register `FrameworkBundle` by default when the `bundles.php` file is missing
1111
* [BC BREAK] The `secrets:decrypt-to-local` command terminates with a non-zero exit code when a secret could not be read
12+
* Add `priority` option to the `messenger.transports` configurations and pass the configured priority to the `'messenger.receiver'` service tag
1213

1314
7.1
1415
---

‎src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1612,6 +1612,7 @@ function ($a) {
16121612
->defaultNull()
16131613
->info('Rate limiter name to use when processing messages')
16141614
->end()
1615+
->integerNode('priority')->defaultValue(0)->info('Priority of this transport when the consumer is executed with the --all flag')->end()
16151616
->end()
16161617
->end()
16171618
->end()

‎src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2220,6 +2220,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
22202220
->addTag('messenger.receiver', [
22212221
'alias' => $name,
22222222
'is_failure_transport' => \in_array($name, $failureTransports, true),
2223+
'priority' => $transport['priority'],
22232224
])
22242225
;
22252226
$container->setDefinition($transportId = 'messenger.transport.'.$name, $transportDefinition);

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/schema/symfony-1.0.xsd

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/schema/symfony-1.0.xsd
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -606,6 +606,7 @@
606606
<xsd:attribute name="dsn" type="xsd:string" />
607607
<xsd:attribute name="failure-transport" type="xsd:string" />
608608
<xsd:attribute name="rate-limiter" type="xsd:string" />
609+
<xsd:attribute name="priority" type="xsd:integer" />
609610
</xsd:complexType>
610611

611612
<xsd:complexType name="messenger_retry_strategy">

‎src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_transports.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_transports.php
+4Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@
2525
],
2626
'rate_limiter' => 'customised_worker'
2727
],
28+
'prioritized' => [
29+
'dsn' => 'amqp://localhost/%2f/messages?exchange_name=priority',
30+
'priority' => 10,
31+
],
2832
'failed' => 'in-memory:///',
2933
'redis' => 'redis://127.0.0.1:6379/messages',
3034
'beanstalkd' => 'beanstalkd://127.0.0.1:11300',

‎src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_transports.xml

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_transports.xml
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
</framework:options>
2121
<framework:retry-strategy max-retries="10" delay="7" multiplier="3" max-delay="100"/>
2222
</framework:transport>
23+
<framework:transport name="prioritized" dsn="amqp://localhost/%2f/messages?exchange_name=priority" priority="10" />
2324
<framework:transport name="failed" dsn="in-memory:///" />
2425
<framework:transport name="redis" dsn="redis://127.0.0.1:6379/messages" />
2526
<framework:transport name="beanstalkd" dsn="beanstalkd://127.0.0.1:11300" />

‎src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_transports.yml

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_transports.yml
+3Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ framework:
2323
multiplier: 3
2424
max_delay: 100
2525
rate_limiter: customised_worker
26+
prioritized:
27+
dsn: amqp://localhost/%2f/messages?exchange_name=priority
28+
priority: 10
2629
failed: 'in-memory:///'
2730
redis: 'redis://127.0.0.1:6379/messages'
2831
beanstalkd: 'beanstalkd://127.0.0.1:11300'

‎src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTestCase.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTestCase.php
+18-2Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -873,6 +873,7 @@ public function testMessengerMultipleFailureTransports()
873873
$this->assertEquals([
874874
'alias' => 'failure_transport_1',
875875
'is_failure_transport' => true,
876+
'priority' => 0,
876877
], $failureTransport1Tags);
877878

878879
$failureTransport3Definition = $container->getDefinition('messenger.transport.failure_transport_3');
@@ -881,6 +882,7 @@ public function testMessengerMultipleFailureTransports()
881882
$this->assertEquals([
882883
'alias' => 'failure_transport_3',
883884
'is_failure_transport' => true,
885+
'priority' => 0,
884886
], $failureTransport3Tags);
885887

886888
// transport 2 exists but does not appear in the mapping
@@ -913,6 +915,7 @@ public function testMessengerMultipleFailureTransportsWithGlobalFailureTransport
913915
$this->assertEquals([
914916
'alias' => 'failure_transport_1',
915917
'is_failure_transport' => true,
918+
'priority' => 0,
916919
], $failureTransport1Tags);
917920

918921
$failureTransport3Definition = $container->getDefinition('messenger.transport.failure_transport_3');
@@ -921,6 +924,7 @@ public function testMessengerMultipleFailureTransportsWithGlobalFailureTransport
921924
$this->assertEquals([
922925
'alias' => 'failure_transport_3',
923926
'is_failure_transport' => true,
927+
'priority' => 0,
924928
], $failureTransport3Tags);
925929

926930
$failureTransportsByTransportNameServiceLocator = $container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')->getArgument(0);
@@ -947,15 +951,26 @@ public function testMessengerTransports()
947951
$container = $this->createContainerFromFile('messenger_transports');
948952
$this->assertTrue($container->hasDefinition('messenger.transport.default'));
949953
$this->assertTrue($container->getDefinition('messenger.transport.default')->hasTag('messenger.receiver'));
950-
$this->assertEquals([
951-
['alias' => 'default', 'is_failure_transport' => false], ], $container->getDefinition('messenger.transport.default')->getTag('messenger.receiver'));
954+
$this->assertEquals([[
955+
'alias' => 'default',
956+
'is_failure_transport' => false,
957+
'priority' => 0,
958+
]], $container->getDefinition('messenger.transport.default')->getTag('messenger.receiver'));
952959
$transportArguments = $container->getDefinition('messenger.transport.default')->getArguments();
953960
$this->assertEquals(new Reference('messenger.default_serializer'), $transportArguments[2]);
954961

955962
$this->assertTrue($container->hasDefinition('messenger.transport.customised'));
956963
$transportFactory = $container->getDefinition('messenger.transport.customised')->getFactory();
957964
$transportArguments = $container->getDefinition('messenger.transport.customised')->getArguments();
958965

966+
$this->assertTrue($container->hasDefinition('messenger.transport.prioritized'));
967+
$this->assertTrue($container->getDefinition('messenger.transport.prioritized')->hasTag('messenger.receiver'));
968+
$this->assertEquals([[
969+
'alias' => 'prioritized',
970+
'is_failure_transport' => false,
971+
'priority' => 10,
972+
]], $container->getDefinition('messenger.transport.prioritized')->getTag('messenger.receiver'));
973+
959974
$this->assertEquals([new Reference('messenger.transport_factory'), 'createTransport'], $transportFactory);
960975
$this->assertCount(3, $transportArguments);
961976
$this->assertSame('amqp://localhost/%2f/messages?exchange_name=exchange_name', $transportArguments[0]);
@@ -1000,6 +1015,7 @@ public function testMessengerTransports()
10001015
$failureTransportsByTransportNameServiceLocator = $container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')->getArgument(0);
10011016
$failureTransports = $container->getDefinition((string) $failureTransportsByTransportNameServiceLocator)->getArgument(0);
10021017
$expectedTransportsByFailureTransports = [
1018+
'prioritized' => new Reference('messenger.transport.failed'),
10031019
'beanstalkd' => new Reference('messenger.transport.failed'),
10041020
'customised' => new Reference('messenger.transport.failed'),
10051021
'default' => new Reference('messenger.transport.failed'),

‎src/Symfony/Component/Messenger/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/CHANGELOG.md
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ CHANGELOG
77
* `WrappedExceptionsInterface` now extends PHP's `Throwable` interface
88
* Add `#[AsMessage]` attribute with `$transport` parameter for message routing
99
* Add `--format` option to the `messenger:stats` command
10+
* Allow prioritizing receivers so that `messenger:consume --all` consumes receivers in a predefined order
1011

1112
7.1
1213
---

‎src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php
+10Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,8 @@ private function registerReceivers(ContainerBuilder $container, array $busIds):
242242
{
243243
$receiverMapping = [];
244244
$failureTransportsMap = [];
245+
$receiverPriority = [];
246+
245247
if ($container->hasDefinition('console.command.messenger_failed_messages_retry')) {
246248
$commandDefinition = $container->getDefinition('console.command.messenger_failed_messages_retry');
247249
$globalReceiverName = $commandDefinition->getArgument(0);
@@ -263,15 +265,23 @@ private function registerReceivers(ContainerBuilder $container, array $busIds):
263265
$receiverMapping[$id] = new Reference($id);
264266

265267
foreach ($tags as $tag) {
268+
$receiverPriority[$id] = max($tag['priority'] ?? 0, $receiverPriority[$id] ?? PHP_INT_MIN);
269+
266270
if (isset($tag['alias'])) {
267271
$receiverMapping[$tag['alias']] = $receiverMapping[$id];
272+
$receiverPriority[$tag['alias']] = max($tag['priority'] ?? 0, $receiverPriority[$tag['alias']] ?? PHP_INT_MIN);
273+
268274
if ($tag['is_failure_transport'] ?? false) {
269275
$failureTransportsMap[$tag['alias']] = $receiverMapping[$id];
270276
}
271277
}
272278
}
273279
}
274280

281+
$prioritySorter = fn (string $a, string $b): int => $receiverPriority[$b] <=> $receiverPriority[$a];
282+
uksort($receiverMapping, $prioritySorter);
283+
uksort($failureTransportsMap, $prioritySorter);
284+
275285
$receiverNames = [];
276286
foreach ($receiverMapping as $name => $reference) {
277287
$receiverNames[(string) $reference] = $name;

‎src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php
+5-2Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
use Symfony\Component\DependencyInjection\ServiceLocator;
2525
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
2626
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpReceiver;
27+
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineReceiver;
2728
use Symfony\Component\Messenger\Command\ConsumeMessagesCommand;
2829
use Symfony\Component\Messenger\Command\DebugCommand;
2930
use Symfony\Component\Messenger\Command\FailedMessagesRetryCommand;
@@ -448,11 +449,12 @@ public function testItRegistersMultipleReceiversAndSetsTheReceiverNamesOnTheComm
448449
]);
449450

450451
$container->register(AmqpReceiver::class, AmqpReceiver::class)->addTag('messenger.receiver', ['alias' => 'amqp']);
452+
$container->register(DoctrineReceiver::class, DoctrineReceiver::class)->addTag('messenger.receiver', ['alias' => 'doctrine', 'priority' => 1]);
451453
$container->register(DummyReceiver::class, DummyReceiver::class)->addTag('messenger.receiver', ['alias' => 'dummy']);
452454

453455
(new MessengerPass())->process($container);
454456

455-
$this->assertSame(['amqp', 'dummy'], $container->getDefinition('console.command.messenger_consume_messages')->getArgument(4));
457+
$this->assertSame(['doctrine', 'amqp', 'dummy'], $container->getDefinition('console.command.messenger_consume_messages')->getArgument(4));
456458
}
457459

458460
public function testItSetsTheReceiverNamesOnTheSetupTransportsCommand()
@@ -464,11 +466,12 @@ public function testItSetsTheReceiverNamesOnTheSetupTransportsCommand()
464466
]);
465467

466468
$container->register(AmqpReceiver::class, AmqpReceiver::class)->addTag('messenger.receiver', ['alias' => 'amqp']);
469+
$container->register(DoctrineReceiver::class, DoctrineReceiver::class)->addTag('messenger.receiver', ['alias' => 'doctrine', 'priority' => 1]);
467470
$container->register(DummyReceiver::class, DummyReceiver::class)->addTag('messenger.receiver', ['alias' => 'dummy']);
468471

469472
(new MessengerPass())->process($container);
470473

471-
$this->assertSame(['amqp', 'dummy'], $container->getDefinition('console.command.messenger_setup_transports')->getArgument(1));
474+
$this->assertSame(['doctrine', 'amqp', 'dummy'], $container->getDefinition('console.command.messenger_setup_transports')->getArgument(1));
472475
}
473476

474477
public function testItRegistersHandlersOnDifferentBuses()

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.