Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 84faecf

Browse filesBrowse files
committed
feature #38973 [Messenger] Allow to limit consumer to specific queues (dbu)
This PR was squashed before being merged into the 5.3-dev branch. Discussion ---------- [Messenger] Allow to limit consumer to specific queues | Q | A | ------------- | --- | Branch? | 5.x for features | Bug fix? | no | New feature? | yes (TODO: changelog) | Deprecations? | no | Tickets | Fix #38630 (i think) | License | MIT | Doc PR | symfony/symfony-docs#... TODO (Note: I am aware that there are other solutions for #38630 that might be more elegant. Our usecase does not use fanout, the reason why we need the functionality is different) **Description** We have a large application where one part is creating messages for products that need reindexing. A transport decorator decides before queueing whether this is a large effort or a small effort, based on some metric. Based on that, it adds a routing key which is then used in rabbitmq to put the message into the "small" or "large" queue. We need two separate consumer processes that consume the small and the large queue, for separate scaling and such. I looked into how we could achieve that. One option is to offer another option in the consume command. That would need to be forwarded to the receiver somehow, i added an interface for it now. The current PR is an illustration of the idea. If you specify a queue that the receiver does not have, things will fail in an inlegeant. If you specify multiple receivers, you can't specify the queue per receiver (though that starts being an odd usecase imho, you could then run two consumers instead) Another option could be to allow configuring multiple receivers for the same transport that get the queue name(s) injected into their constructor. Then you can consume them separately. This currently needs a ton of configuration and some custom code to work. I can look at doing a PR to make this approach simpler, if you prefer it over the option to the consume command... Commits ------- 9af1e20 Adding changelog 81d6a49 [Messenger] Allow to limit consumer to specific queues
2 parents d9f490a + 9af1e20 commit 84faecf
Copy full SHA for 84faecf

File tree

9 files changed

+115
-8
lines changed
Filter options

9 files changed

+115
-8
lines changed

‎.github/composer-config.json

Copy file name to clipboardExpand all lines: .github/composer-config.json
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
"preferred-install": {
55
"symfony/form": "source",
66
"symfony/http-kernel": "source",
7+
"symfony/messenger": "source",
78
"symfony/notifier": "source",
89
"symfony/validator": "source",
910
"*": "dist"

‎src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md
+2-1Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ CHANGELOG
44
5.3
55
---
66

7-
* Deprecated the `prefetch_count` parameter, it has no effect and will be removed in Symfony 6.0.
7+
* Deprecated the `prefetch_count` parameter, it has no effect and will be removed in Symfony 6.0.
8+
* `AmqpReceiver` implements `QueueReceiverInterface` to fetch messages from a specific set of queues.
89

910
5.2.0
1011
-----

‎src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php
+11-3Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
1717
use Symfony\Component\Messenger\Exception\TransportException;
1818
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
19-
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
19+
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
2020
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
2121
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
2222

@@ -25,7 +25,7 @@
2525
*
2626
* @author Samuel Roze <samuel.roze@gmail.com>
2727
*/
28-
class AmqpReceiver implements ReceiverInterface, MessageCountAwareInterface
28+
class AmqpReceiver implements QueueReceiverInterface, MessageCountAwareInterface
2929
{
3030
private $serializer;
3131
private $connection;
@@ -41,7 +41,15 @@ public function __construct(Connection $connection, SerializerInterface $seriali
4141
*/
4242
public function get(): iterable
4343
{
44-
foreach ($this->connection->getQueueNames() as $queueName) {
44+
yield from $this->getFromQueues($this->connection->getQueueNames());
45+
}
46+
47+
/**
48+
* {@inheritdoc}
49+
*/
50+
public function getFromQueues(array $queueNames): iterable
51+
{
52+
foreach ($queueNames as $queueName) {
4553
yield from $this->getEnvelope($queueName);
4654
}
4755
}

‎src/Symfony/Component/Messenger/Bridge/Amqp/composer.json

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Amqp/composer.json
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
"require": {
1919
"php": ">=7.2.5",
2020
"symfony/deprecation-contracts": "^2.1",
21-
"symfony/messenger": "^5.1"
21+
"symfony/messenger": "^5.3"
2222
},
2323
"require-dev": {
2424
"symfony/event-dispatcher": "^4.4|^5.0",

‎src/Symfony/Component/Messenger/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/CHANGELOG.md
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ CHANGELOG
55
---
66

77
* `InMemoryTransport` can perform message serialization through dsn `in-memory://?serialize=true`.
8+
* Added `queues` option to `Worker` to only fetch messages from a specific queue from a receiver implementing `QueueReceiverInterface`.
89

910
5.2.0
1011
-----

‎src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
+11-2Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ protected function configure(): void
7171
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can handle new messages'),
7272
new InputOption('sleep', null, InputOption::VALUE_REQUIRED, 'Seconds to sleep before asking for new messages after no messages were found', 1),
7373
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically)'),
74+
new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'),
7475
])
7576
->setDescription(self::$defaultDescription)
7677
->setHelp(<<<'EOF'
@@ -104,6 +105,10 @@ protected function configure(): void
104105
messages didn't originate from Messenger:
105106
106107
<info>php %command.full_name% <receiver-name> --bus=event_bus</info>
108+
109+
Use the --queues option to limit a receiver to only certain queues (only supported by some receivers):
110+
111+
<info>php %command.full_name% <receiver-name> --queues=fasttrack</info>
107112
EOF
108113
)
109114
;
@@ -195,9 +200,13 @@ protected function execute(InputInterface $input, OutputInterface $output)
195200
$bus = $input->getOption('bus') ? $this->routableBus->getMessageBus($input->getOption('bus')) : $this->routableBus;
196201

197202
$worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger);
198-
$worker->run([
203+
$options = [
199204
'sleep' => $input->getOption('sleep') * 1000000,
200-
]);
205+
];
206+
if ($queues = $input->getOption('queues')) {
207+
$options['queues'] = $queues;
208+
}
209+
$worker->run($options);
201210

202211
return 0;
203212
}

‎src/Symfony/Component/Messenger/Tests/WorkerTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/WorkerTest.php
+37Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,14 @@
2121
use Symfony\Component\Messenger\Event\WorkerStartedEvent;
2222
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
2323
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
24+
use Symfony\Component\Messenger\Exception\RuntimeException;
2425
use Symfony\Component\Messenger\MessageBusInterface;
2526
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
2627
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
2728
use Symfony\Component\Messenger\Stamp\SentStamp;
2829
use Symfony\Component\Messenger\Stamp\StampInterface;
2930
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
31+
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
3032
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
3133
use Symfony\Component\Messenger\Worker;
3234
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
@@ -245,6 +247,41 @@ public function testWorkerWithMultipleReceivers()
245247
$this->assertSame([$envelope1, $envelope2, $envelope3, $envelope4, $envelope5, $envelope6], $processedEnvelopes);
246248
}
247249

250+
public function testWorkerLimitQueues()
251+
{
252+
$envelope = [new Envelope(new DummyMessage('message1'))];
253+
$receiver = $this->createMock(QueueReceiverInterface::class);
254+
$receiver->expects($this->once())
255+
->method('getFromQueues')
256+
->with(['foo'])
257+
->willReturn($envelope)
258+
;
259+
$receiver->expects($this->never())
260+
->method('get')
261+
;
262+
263+
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
264+
265+
$dispatcher = new EventDispatcher();
266+
$dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));
267+
268+
$worker = new Worker(['transport' => $receiver], $bus, $dispatcher);
269+
$worker->run(['queues' => ['foo']]);
270+
}
271+
272+
public function testWorkerLimitQueuesUnsupported()
273+
{
274+
$receiver1 = $this->createMock(QueueReceiverInterface::class);
275+
$receiver2 = $this->createMock(ReceiverInterface::class);
276+
277+
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
278+
279+
$worker = new Worker(['transport1' => $receiver1, 'transport2' => $receiver2], $bus);
280+
$this->expectException(RuntimeException::class);
281+
$this->expectExceptionMessage(sprintf('Receiver for "transport2" does not implement "%s".', QueueReceiverInterface::class));
282+
$worker->run(['queues' => ['foo']]);
283+
}
284+
248285
public function testWorkerMessageReceivedEventMutability()
249286
{
250287
$envelope = new Envelope(new DummyMessage('Hello'));
+33Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Transport\Receiver;
13+
14+
use Symfony\Component\Messenger\Envelope;
15+
16+
/**
17+
* Some transports may have multiple queues. This interface is used to read from only some queues.
18+
*
19+
* @author David Buchmann <mail@davidbu.ch>
20+
*
21+
* @experimental in 5.3
22+
*/
23+
interface QueueReceiverInterface extends ReceiverInterface
24+
{
25+
/**
26+
* Get messages from the specified queue names instead of consuming from all queues.
27+
*
28+
* @param string[] $queueNames
29+
*
30+
* @return Envelope[]
31+
*/
32+
public function getFromQueues(array $queueNames): iterable;
33+
}

‎src/Symfony/Component/Messenger/Worker.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Worker.php
+18-1Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@
2222
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
2323
use Symfony\Component\Messenger\Exception\HandlerFailedException;
2424
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
25+
use Symfony\Component\Messenger\Exception\RuntimeException;
2526
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
2627
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
28+
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
2729
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
2830
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
2931

@@ -57,6 +59,7 @@ public function __construct(array $receivers, MessageBusInterface $bus, EventDis
5759
*
5860
* Valid options are:
5961
* * sleep (default: 1000000): Time in microseconds to sleep after no messages are found
62+
* * queues: The queue names to consume from, instead of consuming from all queues. When this is used, all receivers must implement the QueueReceiverInterface
6063
*/
6164
public function run(array $options = []): void
6265
{
@@ -65,11 +68,25 @@ public function run(array $options = []): void
6568
$options = array_merge([
6669
'sleep' => 1000000,
6770
], $options);
71+
$queueNames = $options['queues'] ?? false;
72+
73+
if ($queueNames) {
74+
// if queue names are specified, all receivers must implement the QueueReceiverInterface
75+
foreach ($this->receivers as $transportName => $receiver) {
76+
if (!$receiver instanceof QueueReceiverInterface) {
77+
throw new RuntimeException(sprintf('Receiver for "%s" does not implement "%s".', $transportName, QueueReceiverInterface::class));
78+
}
79+
}
80+
}
6881

6982
while (false === $this->shouldStop) {
7083
$envelopeHandled = false;
7184
foreach ($this->receivers as $transportName => $receiver) {
72-
$envelopes = $receiver->get();
85+
if ($queueNames) {
86+
$envelopes = $receiver->getFromQueues($queueNames);
87+
} else {
88+
$envelopes = $receiver->get();
89+
}
7390

7491
foreach ($envelopes as $envelope) {
7592
$envelopeHandled = true;

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.