Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 3864a11

Browse filesBrowse files
committed
Adding the ability to consume multiple transports in one Worker
1 parent bf89cd6 commit 3864a11
Copy full SHA for 3864a11

13 files changed

+181
-95
lines changed

‎src/Symfony/Component/Messenger/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/CHANGELOG.md
+3-1Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ CHANGELOG
2323
* The default command name for `ConsumeMessagesCommand` was
2424
changed from `messenger:consume-messages` to `messenger:consume`
2525
* `ConsumeMessagesCommand` has two new optional constructor arguments
26-
* `Worker` has 4 new option constructor arguments.
26+
* [BC BREAK] The first argument to Worker changed from a single
27+
`ReceiverInterface` to an array of `ReceiverInterface`.
28+
* `Worker` has 3 new optional constructor arguments.
2729
* The `Worker` class now handles calling `pcntl_signal_dispatch()` the
2830
receiver no longer needs to call this.
2931
* The `AmqpSender` will now retry messages using a dead-letter exchange

‎src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
+40-19Lines changed: 40 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
use Symfony\Component\Console\Input\InputInterface;
2020
use Symfony\Component\Console\Input\InputOption;
2121
use Symfony\Component\Console\Output\OutputInterface;
22+
use Symfony\Component\Console\Question\ChoiceQuestion;
2223
use Symfony\Component\Console\Style\SymfonyStyle;
2324
use Symfony\Component\Messenger\RoutableMessageBus;
2425
use Symfony\Component\Messenger\Worker;
@@ -70,7 +71,7 @@ protected function configure(): void
7071

7172
$this
7273
->setDefinition([
73-
new InputArgument('receiver', $defaultReceiverName ? InputArgument::OPTIONAL : InputArgument::REQUIRED, 'Name of the receiver', $defaultReceiverName),
74+
new InputArgument('receivers', InputArgument::IS_ARRAY, 'Names of the receivers/transports to consume in order of priority', $defaultReceiverName ? [$defaultReceiverName] : []),
7475
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
7576
new InputOption('memory-limit', 'm', InputOption::VALUE_REQUIRED, 'The memory limit the worker can consume'),
7677
new InputOption('time-limit', 't', InputOption::VALUE_REQUIRED, 'The time limit in seconds the worker can run'),
@@ -83,6 +84,10 @@ protected function configure(): void
8384
8485
<info>php %command.full_name% <receiver-name></info>
8586
87+
To receive from multiple transports, pass each name:
88+
89+
<info>php %command.full_name% receiver1 receiver2</info>
90+
8691
Use the --limit option to limit the number of messages received:
8792
8893
<info>php %command.full_name% <receiver-name> --limit=10</info>
@@ -112,16 +117,22 @@ protected function interact(InputInterface $input, OutputInterface $output)
112117
{
113118
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
114119

115-
if ($this->receiverNames && !$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
116-
if (null === $receiverName) {
117-
$io->block('Missing receiver argument.', null, 'error', ' ', true);
118-
$input->setArgument('receiver', $io->choice('Select one of the available receivers', $this->receiverNames));
119-
} elseif ($alternatives = $this->findAlternatives($receiverName, $this->receiverNames)) {
120-
$io->block(sprintf('Receiver "%s" is not defined.', $receiverName), null, 'error', ' ', true);
121-
if ($io->confirm(sprintf('Do you want to receive from "%s" instead? ', $alternatives[0]), false)) {
122-
$input->setArgument('receiver', $alternatives[0]);
123-
}
120+
if ($this->receiverNames && 0 === count($input->getArgument('receivers'))) {
121+
$io->block('Which transports/receivers do you want to consume?', null, 'fg=white;bg=blue', ' ', true);
122+
123+
$io->writeln('Choose which receivers you want to consume messages from in order of priority.');
124+
if (count($this->receiverNames) > 1) {
125+
$io->writeln(sprintf('Hint: to consume from multiple, use a list of their names, e.g. <comment>%s</comment>', implode(', ', $this->receiverNames)));
124126
}
127+
128+
$question = new ChoiceQuestion('Select receivers to consume:', $this->receiverNames, 0);
129+
$question->setMultiselect(true);
130+
131+
$input->setArgument('receivers', $io->askQuestion($question));
132+
}
133+
134+
if (0 === count($input->getArgument('receivers'))) {
135+
throw new RuntimeException('Please pass at least one receiver.');
125136
}
126137
}
127138

@@ -136,24 +147,34 @@ protected function execute(InputInterface $input, OutputInterface $output): void
136147
$output->writeln(sprintf('<comment>%s</comment>', $message));
137148
}
138149

139-
if (!$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
140-
throw new RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName));
141-
}
150+
$receivers = [];
151+
$retryStrategies = [];
152+
foreach ($receiverNames = $input->getArgument('receivers') as $receiverName) {
153+
if (!$this->receiverLocator->has($receiverName)) {
154+
$message = sprintf('The receiver "%s" does not exist.', $receiverName);
155+
if ($this->receiverNames) {
156+
$message .= sprintf(' Valid receivers are: %s.', implode(', ', $this->receiverNames));
157+
}
158+
159+
throw new RuntimeException($message);
160+
}
161+
162+
if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) {
163+
throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName));
164+
}
142165

143-
if (null !== $this->retryStrategyLocator && !$this->retryStrategyLocator->has($receiverName)) {
144-
throw new RuntimeException(sprintf('Receiver "%s" does not have a configured retry strategy.', $receiverName));
166+
$receivers[$receiverName] = $this->receiverLocator->get($receiverName);
167+
$retryStrategies[$receiverName] = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
145168
}
146169

147-
$receiver = $this->receiverLocator->get($receiverName);
148-
$retryStrategy = null !== $this->retryStrategyLocator ? $this->retryStrategyLocator->get($receiverName) : null;
149170

150171
if (null !== $input->getOption('bus')) {
151172
$bus = $this->busLocator->get($input->getOption('bus'));
152173
} else {
153174
$bus = new RoutableMessageBus($this->busLocator);
154175
}
155176

156-
$worker = new Worker($receiver, $bus, $receiverName, $retryStrategy, $this->eventDispatcher, $this->logger);
177+
$worker = new Worker($receivers, $bus, $retryStrategies, $this->eventDispatcher, $this->logger);
157178
$stopsWhen = [];
158179
if ($limit = $input->getOption('limit')) {
159180
$stopsWhen[] = "processed {$limit} messages";
@@ -171,7 +192,7 @@ protected function execute(InputInterface $input, OutputInterface $output): void
171192
}
172193

173194
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
174-
$io->success(sprintf('Consuming messages from transport "%s".', $receiverName));
195+
$io->success(sprintf('Consuming messages from transport%s "%s".', count($receivers) > 0 ? 's' : '', implode(', ', $receiverNames)));
175196

176197
if ($stopsWhen) {
177198
$last = array_pop($stopsWhen);

‎src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php
+2-10Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,8 @@ class ConsumeMessagesCommandTest extends TestCase
2020
public function testConfigurationWithDefaultReceiver()
2121
{
2222
$command = new ConsumeMessagesCommand($this->createMock(ServiceLocator::class), $this->createMock(ServiceLocator::class), null, ['amqp']);
23-
$inputArgument = $command->getDefinition()->getArgument('receiver');
23+
$inputArgument = $command->getDefinition()->getArgument('receivers');
2424
$this->assertFalse($inputArgument->isRequired());
25-
$this->assertSame('amqp', $inputArgument->getDefault());
26-
}
27-
28-
public function testConfigurationWithoutDefaultReceiver()
29-
{
30-
$command = new ConsumeMessagesCommand($this->createMock(ServiceLocator::class), $this->createMock(ServiceLocator::class), null, ['amqp', 'dummy']);
31-
$inputArgument = $command->getDefinition()->getArgument('receiver');
32-
$this->assertTrue($inputArgument->isRequired());
33-
$this->assertNull($inputArgument->getDefault());
25+
$this->assertSame(['amqp'], $inputArgument->getDefault());
3426
}
3527
}

‎src/Symfony/Component/Messenger/Tests/Fixtures/DummyWorker.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Fixtures/DummyWorker.php
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ public function __construct(array $envelopesToReceive)
1515
$this->envelopesToReceive = $envelopesToReceive;
1616
}
1717

18-
public function run(callable $onHandledCallback = null): void
18+
public function run(array $options = [], callable $onHandledCallback = null): void
1919
{
2020
foreach ($this->envelopesToReceive as $envelope) {
2121
if (true === $this->isStopped) {

‎src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php
+20-8Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,20 @@ public function testItSendsAndReceivesMessages()
5757
$sender->send($first = new Envelope(new DummyMessage('First')));
5858
$sender->send($second = new Envelope(new DummyMessage('Second')));
5959

60-
$envelope = $receiver->get();
60+
$envelopes = iterator_to_array($receiver->get());
61+
$this->assertCount(1, $envelopes);
62+
/** @var Envelope $envelope */
63+
$envelope = $envelopes[0];
6164
$this->assertEquals($first->getMessage(), $envelope->getMessage());
6265
$this->assertInstanceOf(AmqpReceivedStamp::class, $envelope->last(AmqpReceivedStamp::class));
6366

64-
$envelope = $receiver->get();
65-
$this->assertEquals($envelope->getMessage(), $envelope->getMessage());
67+
$envelopes = iterator_to_array($receiver->get());
68+
$this->assertCount(1, $envelopes);
69+
/** @var Envelope $envelope */
70+
$envelope = $envelopes[0];
71+
$this->assertEquals($second->getMessage(), $envelope->getMessage());
6672

67-
$this->assertNull($receiver->get());
73+
$this->assertEmpty(iterator_to_array($receiver->get()));
6874
}
6975

7076
public function testRetryAndDelay()
@@ -80,20 +86,26 @@ public function testRetryAndDelay()
8086

8187
$sender->send($first = new Envelope(new DummyMessage('First')));
8288

83-
$envelope = $receiver->get();
89+
$envelopes = iterator_to_array($receiver->get());
90+
/** @var Envelope $envelope */
91+
$envelope = $envelopes[0];
8492
$newEnvelope = $envelope
8593
->with(new DelayStamp(2000))
8694
->with(new RedeliveryStamp(1, 'not_important'));
8795
$sender->send($newEnvelope);
8896
$receiver->ack($envelope);
8997

90-
$envelope = null;
98+
$envelopes = [];
9199
$startTime = time();
92100
// wait for next message, but only for max 3 seconds
93-
while (null === $envelope && $startTime + 3 > time()) {
94-
$envelope = $receiver->get();
101+
while (0 === count($envelopes) && $startTime + 3 > time()) {
102+
$envelopes = iterator_to_array($receiver->get());
95103
}
96104

105+
$this->assertCount(1, $envelopes);
106+
/** @var Envelope $envelope */
107+
$envelope = $envelopes[0];
108+
97109
// should have a 2 second delay
98110
$this->assertGreaterThanOrEqual($startTime + 2, time());
99111
// but only a 2 second delay

‎src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php
+3-2Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ public function testItReturnsTheDecodedMessageToTheHandler()
3939
$connection->method('get')->willReturn($amqpEnvelope);
4040

4141
$receiver = new AmqpReceiver($connection, $serializer);
42-
$actualEnvelope = $receiver->get();
43-
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelope->getMessage());
42+
$actualEnvelopes = iterator_to_array($receiver->get());
43+
$this->assertCount(1, $actualEnvelopes);
44+
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage());
4445
}
4546

4647
/**

‎src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpTransportTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpTransportTest.php
+2-2Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ public function testReceivesMessages()
4747
$serializer->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage));
4848
$connection->method('get')->willReturn($amqpEnvelope);
4949

50-
$envelope = $transport->get();
51-
$this->assertSame($decodedMessage, $envelope->getMessage());
50+
$envelopes = iterator_to_array($transport->get());
51+
$this->assertSame($decodedMessage, $envelopes[0]->getMessage());
5252
}
5353

5454
private function getTransport(SerializerInterface $serializer = null, Connection $connection = null)

‎src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/Fixtures/long_receiver.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/Fixtures/long_receiver.php
+2-2Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
$receiver = new AmqpReceiver($connection, $serializer);
3333
$retryStrategy = new MultiplierRetryStrategy(3, 0);
3434

35-
$worker = new Worker($receiver, new class() implements MessageBusInterface {
35+
$worker = new Worker(['the_receiver' => $receiver], new class() implements MessageBusInterface {
3636
public function dispatch($envelope): Envelope
3737
{
3838
echo 'Get envelope with message: '.\get_class($envelope->getMessage())."\n";
@@ -43,7 +43,7 @@ public function dispatch($envelope): Envelope
4343

4444
return $envelope;
4545
}
46-
}, 'the_receiver', $retryStrategy);
46+
});
4747

4848
echo "Receiving messages...\n";
4949
$worker->run();

‎src/Symfony/Component/Messenger/Tests/Worker/StopWhenMemoryUsageIsExceededWorkerTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Worker/StopWhenMemoryUsageIsExceededWorkerTest.php
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public function testWorkerStopsWhenMemoryLimitExceeded(int $memoryUsage, int $me
3737
};
3838

3939
$memoryLimitWorker = new StopWhenMemoryUsageIsExceededWorker($decoratedWorker, $memoryLimit, null, $memoryResolver);
40-
$memoryLimitWorker->run($handledCallback);
40+
$memoryLimitWorker->run([], $handledCallback);
4141

4242
// handler should be called exactly 1 time
4343
$this->assertSame($handlerCalledTimes, 1);

‎src/Symfony/Component/Messenger/Tests/Worker/StopWhenMessageCountIsExceededWorkerTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Worker/StopWhenMessageCountIsExceededWorkerTest.php
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public function testWorkerStopsWhenMaximumCountExceeded($max, $shouldStop)
3939
]);
4040

4141
$maximumCountWorker = new StopWhenMessageCountIsExceededWorker($decoratedWorker, $max);
42-
$maximumCountWorker->run($handledCallback);
42+
$maximumCountWorker->run([], $handledCallback);
4343

4444
$this->assertSame($shouldStop, $decoratedWorker->isStopped());
4545
}

‎src/Symfony/Component/Messenger/Tests/Worker/StopWhenTimeLimitIsReachedWorkerTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Worker/StopWhenTimeLimitIsReachedWorkerTest.php
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public function testWorkerStopsWhenTimeLimitIsReached()
3434
->with('Worker stopped due to time limit of {timeLimit}s reached', ['timeLimit' => 1]);
3535

3636
$timeoutWorker = new StopWhenTimeLimitIsReachedWorker($decoratedWorker, 1, $logger);
37-
$timeoutWorker->run(function () {
37+
$timeoutWorker->run([], function () {
3838
sleep(2);
3939
});
4040

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.