Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit fa4776f

Browse filesBrowse files
committed
Changes ReceiverInterface::handle() to get() to give more control to Worker
1 parent 91b0afa commit fa4776f
Copy full SHA for fa4776f

25 files changed

+609
-652
lines changed

‎src/Symfony/Component/Messenger/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/CHANGELOG.md
+3-2Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@ CHANGELOG
99
to the `Envelope` then find the correct bus when receiving from
1010
the transport. See `ConsumeMessagesCommand`.
1111
* The optional `$busNames` constructor argument of the class `ConsumeMessagesCommand` was removed.
12-
* [BC BREAK] 2 new methods were added to `ReceiverInterface`:
13-
`ack()` and `reject()`.
12+
* [BC BREAK] 3 new methods were added to `ReceiverInterface`:
13+
`ack()`, `reject()` and `get()`. The methods `receive()`
14+
and `stop()` were removed.
1415
* [BC BREAK] Error handling was moved from the receivers into
1516
`Worker`. Implementations of `ReceiverInterface::handle()`
1617
should now allow all exceptions to be thrown, except for transport

‎src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
+7-7Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@
2121
use Symfony\Component\Console\Output\OutputInterface;
2222
use Symfony\Component\Console\Style\SymfonyStyle;
2323
use Symfony\Component\Messenger\RoutableMessageBus;
24-
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMemoryUsageIsExceededReceiver;
25-
use Symfony\Component\Messenger\Transport\Receiver\StopWhenMessageCountIsExceededReceiver;
26-
use Symfony\Component\Messenger\Transport\Receiver\StopWhenTimeLimitIsReachedReceiver;
2724
use Symfony\Component\Messenger\Worker;
25+
use Symfony\Component\Messenger\Worker\StopWhenMemoryUsageIsExceededWorker;
26+
use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker;
27+
use Symfony\Component\Messenger\Worker\StopWhenTimeLimitIsReachedWorker;
2828
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
2929

3030
/**
@@ -152,20 +152,21 @@ protected function execute(InputInterface $input, OutputInterface $output): void
152152
$bus = new RoutableMessageBus($this->busLocator);
153153
}
154154

155+
$worker = new Worker($receiver, $bus, $receiverName, $retryStrategy, $this->eventDispatcher, $this->logger);
155156
$stopsWhen = [];
156157
if ($limit = $input->getOption('limit')) {
157158
$stopsWhen[] = "processed {$limit} messages";
158-
$receiver = new StopWhenMessageCountIsExceededReceiver($receiver, $limit, $this->logger);
159+
$worker = new StopWhenMessageCountIsExceededWorker($worker, $limit, $this->logger);
159160
}
160161

161162
if ($memoryLimit = $input->getOption('memory-limit')) {
162163
$stopsWhen[] = "exceeded {$memoryLimit} of memory";
163-
$receiver = new StopWhenMemoryUsageIsExceededReceiver($receiver, $this->convertToBytes($memoryLimit), $this->logger);
164+
$worker = new StopWhenMemoryUsageIsExceededWorker($worker, $this->convertToBytes($memoryLimit), $this->logger);
164165
}
165166

166167
if ($timeLimit = $input->getOption('time-limit')) {
167168
$stopsWhen[] = "been running for {$timeLimit}s";
168-
$receiver = new StopWhenTimeLimitIsReachedReceiver($receiver, $timeLimit, $this->logger);
169+
$worker = new StopWhenTimeLimitIsReachedWorker($worker, $timeLimit, $this->logger);
169170
}
170171

171172
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
@@ -183,7 +184,6 @@ protected function execute(InputInterface $input, OutputInterface $output): void
183184
$io->comment('Re-run the command with a -vv option to see logs about consumed messages.');
184185
}
185186

186-
$worker = new Worker($receiver, $bus, $receiverName, $retryStrategy, $this->eventDispatcher, $this->logger);
187187
$worker->run();
188188
}
189189

‎src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php
+2-4Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -612,11 +612,9 @@ public function __invoke(DummyMessage $message): void
612612

613613
class DummyReceiver implements ReceiverInterface
614614
{
615-
public function receive(callable $handler): void
615+
public function get(string $queue = null): ?Envelope
616616
{
617-
for ($i = 0; $i < 3; ++$i) {
618-
$handler(new Envelope(new DummyMessage("Dummy $i")));
619-
}
617+
return new Envelope(new DummyMessage('Dummy'));
620618
}
621619

622620
public function stop(): void

‎src/Symfony/Component/Messenger/Tests/Fixtures/CallbackReceiver.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Fixtures/CallbackReceiver.php
-48Lines changed: 0 additions & 48 deletions
This file was deleted.
+46Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
<?php
2+
3+
namespace Symfony\Component\Messenger\Tests\Fixtures;
4+
5+
use Symfony\Component\Messenger\WorkerInterface;
6+
7+
class DummyWorker implements WorkerInterface
8+
{
9+
private $isStopped = false;
10+
private $envelopesToReceive;
11+
private $envelopesHandled = 0;
12+
13+
public function __construct(array $envelopesToReceive)
14+
{
15+
$this->envelopesToReceive = $envelopesToReceive;
16+
}
17+
18+
public function run(callable $onHandledCallback = null): void
19+
{
20+
foreach ($this->envelopesToReceive as $envelope) {
21+
if (true === $this->isStopped) {
22+
break;
23+
}
24+
25+
if ($onHandledCallback) {
26+
$onHandledCallback($envelope);
27+
++$this->envelopesHandled;
28+
}
29+
}
30+
}
31+
32+
public function stop(): void
33+
{
34+
$this->isStopped = true;
35+
}
36+
37+
public function isStopped(): bool
38+
{
39+
return $this->isStopped;
40+
}
41+
42+
public function countEnvelopesHandled()
43+
{
44+
return $this->envelopesHandled;
45+
}
46+
}

‎src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php
+30-73Lines changed: 30 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -57,16 +57,14 @@ public function testItSendsAndReceivesMessages()
5757
$sender->send($first = new Envelope(new DummyMessage('First')));
5858
$sender->send($second = new Envelope(new DummyMessage('Second')));
5959

60-
$receivedMessages = 0;
61-
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages, $first, $second) {
62-
$expectedEnvelope = 0 === $receivedMessages ? $first : $second;
63-
$this->assertEquals($expectedEnvelope->getMessage(), $envelope->getMessage());
64-
$this->assertInstanceOf(AmqpReceivedStamp::class, $envelope->last(AmqpReceivedStamp::class));
65-
66-
if (2 === ++$receivedMessages) {
67-
$receiver->stop();
68-
}
69-
});
60+
$envelope = $receiver->get();
61+
$this->assertEquals($first->getMessage(), $envelope->getMessage());
62+
$this->assertInstanceOf(AmqpReceivedStamp::class, $envelope->last(AmqpReceivedStamp::class));
63+
64+
$envelope = $receiver->get();
65+
$this->assertEquals($envelope->getMessage(), $envelope->getMessage());
66+
67+
$this->assertNull($receiver->get());
7068
}
7169

7270
public function testRetryAndDelay()
@@ -82,50 +80,32 @@ public function testRetryAndDelay()
8280

8381
$sender->send($first = new Envelope(new DummyMessage('First')));
8482

85-
$receivedMessages = 0;
86-
$startTime = time();
87-
$receiver->receive(function (?Envelope $envelope) use ($receiver, $sender, &$receivedMessages, $startTime) {
88-
if (null === $envelope) {
89-
// if we have been processing for 4 seconds + have received 2 messages
90-
// then it's safe to say no other messages will be received
91-
if (time() > $startTime + 4 && 2 === $receivedMessages) {
92-
$receiver->stop();
93-
}
94-
95-
return;
96-
}
97-
98-
++$receivedMessages;
99-
100-
// retry the first time
101-
if (1 === $receivedMessages) {
102-
// imitate what Worker does
103-
$envelope = $envelope
104-
->with(new DelayStamp(2000))
105-
->with(new RedeliveryStamp(1, 'not_important'));
106-
$sender->send($envelope);
107-
$receiver->ack($envelope);
83+
$envelope = $receiver->get();
84+
$newEnvelope = $envelope
85+
->with(new DelayStamp(2000))
86+
->with(new RedeliveryStamp(1, 'not_important'));
87+
$sender->send($newEnvelope);
88+
$receiver->ack($envelope);
10889

109-
return;
110-
}
111-
112-
if (2 === $receivedMessages) {
113-
// should have a 2 second delay
114-
$this->assertGreaterThanOrEqual($startTime + 2, time());
115-
// but only a 2 second delay
116-
$this->assertLessThan($startTime + 4, time());
90+
$envelope = null;
91+
$startTime = time();
92+
// wait for next message, but only for max 3 seconds
93+
while (null === $envelope && $startTime + 3 > time()) {
94+
$envelope = $receiver->get();
95+
}
11796

118-
/** @var RedeliveryStamp|null $retryStamp */
119-
// verify the stamp still exists from the last send
120-
$retryStamp = $envelope->last(RedeliveryStamp::class);
121-
$this->assertNotNull($retryStamp);
122-
$this->assertSame(1, $retryStamp->getRetryCount());
97+
// should have a 2 second delay
98+
$this->assertGreaterThanOrEqual($startTime + 2, time());
99+
// but only a 2 second delay
100+
$this->assertLessThan($startTime + 4, time());
123101

124-
$receiver->ack($envelope);
102+
/** @var RedeliveryStamp|null $retryStamp */
103+
// verify the stamp still exists from the last send
104+
$retryStamp = $envelope->last(RedeliveryStamp::class);
105+
$this->assertNotNull($retryStamp);
106+
$this->assertSame(1, $retryStamp->getRetryCount());
125107

126-
return;
127-
}
128-
});
108+
$receiver->ack($envelope);
129109
}
130110

131111
public function testItReceivesSignals()
@@ -175,29 +155,6 @@ public function testItReceivesSignals()
175155
, $process->getOutput());
176156
}
177157

178-
/**
179-
* @runInSeparateProcess
180-
*/
181-
public function testItSupportsTimeoutAndTicksNullMessagesToTheHandler()
182-
{
183-
$serializer = $this->createSerializer();
184-
185-
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'), ['read_timeout' => '1']);
186-
$connection->setup();
187-
$connection->queue()->purge();
188-
189-
$receiver = new AmqpReceiver($connection, $serializer);
190-
191-
$receivedMessages = 0;
192-
$receiver->receive(function (?Envelope $envelope) use ($receiver, &$receivedMessages) {
193-
$this->assertNull($envelope);
194-
195-
if (2 === ++$receivedMessages) {
196-
$receiver->stop();
197-
}
198-
});
199-
}
200-
201158
private function waitForOutput(Process $process, string $output, $timeoutInSeconds = 10)
202159
{
203160
$timedOutTime = time() + $timeoutInSeconds;

‎src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php
+9-21Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,11 @@
1414
use PHPUnit\Framework\TestCase;
1515
use Symfony\Component\Messenger\Envelope;
1616
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
17+
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp;
1718
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceiver;
1819
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
1920
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
21+
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
2022
use Symfony\Component\Serializer as SerializerComponent;
2123
use Symfony\Component\Serializer\Encoder\JsonEncoder;
2224
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
@@ -26,7 +28,7 @@
2628
*/
2729
class AmqpReceiverTest extends TestCase
2830
{
29-
public function testItSendTheDecodedMessageToTheHandler()
31+
public function testItReturnsTheDecodedMessageToTheHandler()
3032
{
3133
$serializer = new Serializer(
3234
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
@@ -37,52 +39,38 @@ public function testItSendTheDecodedMessageToTheHandler()
3739
$connection->method('get')->willReturn($amqpEnvelope);
3840

3941
$receiver = new AmqpReceiver($connection, $serializer);
40-
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
41-
$this->assertEquals(new DummyMessage('Hi'), $envelope->getMessage());
42-
$receiver->stop();
43-
});
42+
$actualEnvelope = $receiver->get();
43+
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelope->getMessage());
4444
}
4545

4646
/**
4747
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
4848
*/
4949
public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage()
5050
{
51-
$serializer = new Serializer(
52-
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
53-
);
54-
51+
$serializer = $this->createMock(SerializerInterface::class);
5552
$amqpEnvelope = $this->createAMQPEnvelope();
5653
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
5754
$connection->method('get')->willReturn($amqpEnvelope);
5855
$connection->method('ack')->with($amqpEnvelope)->willThrowException(new \AMQPException());
5956

6057
$receiver = new AmqpReceiver($connection, $serializer);
61-
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
62-
$receiver->ack($envelope);
63-
$receiver->stop();
64-
});
58+
$receiver->ack(new Envelope(new \stdClass(), new AmqpReceivedStamp($amqpEnvelope)));
6559
}
6660

6761
/**
6862
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
6963
*/
7064
public function testItThrowsATransportExceptionIfItCannotRejectMessage()
7165
{
72-
$serializer = new Serializer(
73-
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
74-
);
75-
66+
$serializer = $this->createMock(SerializerInterface::class);
7667
$amqpEnvelope = $this->createAMQPEnvelope();
7768
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
7869
$connection->method('get')->willReturn($amqpEnvelope);
7970
$connection->method('nack')->with($amqpEnvelope, AMQP_NOPARAM)->willThrowException(new \AMQPException());
8071

8172
$receiver = new AmqpReceiver($connection, $serializer);
82-
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
83-
$receiver->reject($envelope);
84-
$receiver->stop();
85-
});
73+
$receiver->reject(new Envelope(new \stdClass(), new AmqpReceivedStamp($amqpEnvelope)));
8674
}
8775

8876
private function createAMQPEnvelope()

‎src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpTransportTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpTransportTest.php
+2-5Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,8 @@ public function testReceivesMessages()
4747
$serializer->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage));
4848
$connection->method('get')->willReturn($amqpEnvelope);
4949

50-
$transport->receive(function (Envelope $envelope) use ($transport, $decodedMessage) {
51-
$this->assertSame($decodedMessage, $envelope->getMessage());
52-
53-
$transport->stop();
54-
});
50+
$envelope = $transport->get();
51+
$this->assertSame($decodedMessage, $envelope->getMessage());
5552
}
5653

5754
private function getTransport(SerializerInterface $serializer = null, Connection $connection = null)

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.