Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit c1d54a8

Browse filesBrowse files
committed
limit consumer to specific queues with command argument
1 parent fb51ddc commit c1d54a8
Copy full SHA for c1d54a8

File tree

4 files changed

+30
-35
lines changed
Filter options

4 files changed

+30
-35
lines changed

‎src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpReceiver.php
+7-16Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
1717
use Symfony\Component\Messenger\Exception\TransportException;
1818
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
19-
use Symfony\Component\Messenger\Transport\Receiver\QueueAwareInterface;
19+
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
2020
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
2121
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
2222
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -26,16 +26,11 @@
2626
*
2727
* @author Samuel Roze <samuel.roze@gmail.com>
2828
*/
29-
class AmqpReceiver implements ReceiverInterface, QueueAwareInterface, MessageCountAwareInterface
29+
class AmqpReceiver implements ReceiverInterface, QueueReceiverInterface, MessageCountAwareInterface
3030
{
3131
private $serializer;
3232
private $connection;
3333

34-
/**
35-
* @var string[]
36-
*/
37-
private $queueNames;
38-
3934
public function __construct(Connection $connection, SerializerInterface $serializer = null)
4035
{
4136
$this->connection = $connection;
@@ -47,18 +42,14 @@ public function __construct(Connection $connection, SerializerInterface $seriali
4742
*/
4843
public function get(): iterable
4944
{
50-
if (!$this->queueNames) {
51-
$this->queueNames = $this->connection->getQueueNames();
52-
}
53-
54-
foreach ($this->queueNames as $queueName) {
55-
yield from $this->getEnvelope($queueName);
56-
}
45+
yield from $this->getFromQueues($this->connection->getQueueNames());
5746
}
5847

59-
public function setQueueNames(array $queueNames): void
48+
public function getFromQueues(array $queueNames): iterable
6049
{
61-
$this->queueNames = $queueNames;
50+
foreach ($queueNames as $queueName) {
51+
yield from $this->getEnvelope($queueName);
52+
}
6253
}
6354

6455
private function getEnvelope(string $queueName): iterable

‎src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
+8-12Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
2929
use Symfony\Component\Messenger\EventListener\StopWorkerOnTimeLimitListener;
3030
use Symfony\Component\Messenger\RoutableMessageBus;
31-
use Symfony\Component\Messenger\Transport\Receiver\QueueAwareInterface;
31+
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
3232
use Symfony\Component\Messenger\Worker;
3333

3434
/**
@@ -71,7 +71,7 @@ protected function configure(): void
7171
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can handle new messages'),
7272
new InputOption('sleep', null, InputOption::VALUE_REQUIRED, 'Seconds to sleep before asking for new messages after no messages were found', 1),
7373
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically)'),
74-
new InputOption('queue', null, InputOption::VALUE_REQUIRED|InputOption::VALUE_IS_ARRAY, 'To limit consuming to a only some queues (for receivers that support it)'),
74+
new InputOption('queues', null, InputOption::VALUE_REQUIRED|InputOption::VALUE_IS_ARRAY, 'To limit consuming to a only some queues (for receivers that support it)'),
7575
])
7676
->setDescription('Consumes messages')
7777
->setHelp(<<<'EOF'
@@ -198,20 +198,16 @@ protected function execute(InputInterface $input, OutputInterface $output)
198198
$io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
199199
}
200200

201-
if ($queues = $input->getOption('queue')) {
202-
foreach ($receivers as $receiver) {
203-
if (!$receiver instanceof QueueAwareInterface) {
204-
throw new \RuntimeException();
205-
}
206-
$receiver->setQueues($queues);
207-
}
208-
}
209201
$bus = $input->getOption('bus') ? $this->routableBus->getMessageBus($input->getOption('bus')) : $this->routableBus;
210202

211203
$worker = new Worker($receivers, $bus, $this->eventDispatcher, $this->logger);
212-
$worker->run([
204+
$options = [
213205
'sleep' => $input->getOption('sleep') * 1000000,
214-
]);
206+
];
207+
if ($queues = $input->getOption('queues')) {
208+
$options['queues'] = $queues;
209+
}
210+
$worker->run($options);
215211

216212
return 0;
217213
}

‎src/Symfony/Component/Messenger/Transport/Receiver/QueueAwareInterface.php renamed to ‎src/Symfony/Component/Messenger/Transport/Receiver/QueueReceiverInterface.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Transport/Receiver/QueueReceiverInterface.php
+3-6Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,15 @@
1111

1212
namespace Symfony\Component\Messenger\Transport\Receiver;
1313

14-
use Symfony\Component\Messenger\Envelope;
15-
use Symfony\Component\Messenger\Exception\TransportException;
16-
1714
/**
1815
* @author David Buchmann <mail@davidbu.ch>
1916
*/
20-
interface QueueAwareInterface
17+
interface QueueReceiverInterface
2118
{
2219
/**
23-
* Limit this receiver to the specified queue names instead of consuming from all queues.
20+
* Get messages from the specified queue names instead of consuming from all queues.
2421
*
2522
* @param string[] $queueNames
2623
*/
27-
public function setQueueNames(array $queueNames): void;
24+
public function getFromQueues(array $queueNames): iterable;
2825
}

‎src/Symfony/Component/Messenger/Worker.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Worker.php
+12-1Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
2525
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
2626
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
27+
use Symfony\Component\Messenger\Transport\Receiver\QueueAwareInterface;
28+
use Symfony\Component\Messenger\Transport\Receiver\QueueReceiverInterface;
2729
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
2830
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
2931

@@ -57,6 +59,7 @@ public function __construct(array $receivers, MessageBusInterface $bus, EventDis
5759
*
5860
* Valid options are:
5961
* * sleep (default: 1000000): Time in microseconds to sleep after no messages are found
62+
* * queues: The queue names to consume from, instead of consuming from all queues. All receivers must implement QueueReceiverInterface
6063
*/
6164
public function run(array $options = []): void
6265
{
@@ -65,11 +68,19 @@ public function run(array $options = []): void
6568
$options = array_merge([
6669
'sleep' => 1000000,
6770
], $options);
71+
$queueNames = $options['queues'] ?? false;
6872

6973
while (false === $this->shouldStop) {
7074
$envelopeHandled = false;
7175
foreach ($this->receivers as $transportName => $receiver) {
72-
$envelopes = $receiver->get();
76+
if ($queueNames) {
77+
if (!$receiver instanceof QueueReceiverInterface) {
78+
throw new \RuntimeException("Receiver $transportName does not implement ".QueueReceiverInterface::class);
79+
}
80+
$envelopes = $receiver->getFromQueues($queueNames);
81+
} else {
82+
$envelopes = $receiver->get();
83+
}
7384

7485
foreach ($envelopes as $envelope) {
7586
$envelopeHandled = true;

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.