Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit e7c770a

Browse filesBrowse files
committed
Changes ReceiverInterface::handle() to get() to give more control to Worker
1 parent 76260e7 commit e7c770a
Copy full SHA for e7c770a

File tree

Expand file treeCollapse file tree

6 files changed

+43
-47
lines changed
Filter options
Expand file treeCollapse file tree

6 files changed

+43
-47
lines changed

‎src/Symfony/Component/Messenger/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/CHANGELOG.md
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ CHANGELOG
1010
the transport. See `ConsumeMessagesCommand`.
1111
* The optional `$busNames` constructor argument of the class `ConsumeMessagesCommand` was removed.
1212
* [BC BREAK] 2 new methods were added to `ReceiverInterface`:
13-
`ack()` and `reject()`.
13+
`ack()` and `reject()` and `receive()` was changed to `get()`.
1414
* [BC BREAK] Error handling was moved from the receivers into
1515
`Worker`. Implementations of `ReceiverInterface::handle()`
1616
should now allow all exceptions to be thrown, except for transport

‎src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/DependencyInjection/MessengerPassTest.php
+2-4Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -595,11 +595,9 @@ public function __invoke(DummyMessage $message): void
595595

596596
class DummyReceiver implements ReceiverInterface
597597
{
598-
public function receive(callable $handler): void
598+
public function get(string $queue = null): ?Envelope
599599
{
600-
for ($i = 0; $i < 3; ++$i) {
601-
$handler(new Envelope(new DummyMessage("Dummy $i")));
602-
}
600+
return new Envelope(new DummyMessage('Dummy'));
603601
}
604602

605603
public function stop(): void

‎src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php
+25-30Lines changed: 25 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -41,38 +41,33 @@ public function __construct(Connection $connection, SerializerInterface $seriali
4141
/**
4242
* {@inheritdoc}
4343
*/
44-
public function receive(callable $handler): void
44+
public function get(string $queue = null): ?Envelope
4545
{
46-
while (!$this->shouldStop) {
47-
try {
48-
$amqpEnvelope = $this->connection->get();
49-
} catch (\AMQPException $exception) {
50-
throw new TransportException($exception->getMessage(), 0, $exception);
51-
}
52-
53-
if (null === $amqpEnvelope) {
54-
$handler(null);
55-
56-
usleep($this->connection->getConnectionConfiguration()['loop_sleep'] ?? 200000);
57-
58-
continue;
59-
}
60-
61-
try {
62-
$envelope = $this->serializer->decode([
63-
'body' => $amqpEnvelope->getBody(),
64-
'headers' => $amqpEnvelope->getHeaders(),
65-
]);
66-
} catch (MessageDecodingFailedException $exception) {
67-
// invalid message of some type
68-
$this->rejectAmqpEnvelope($amqpEnvelope);
69-
70-
throw $exception;
71-
}
72-
73-
$envelope = $envelope->with(new AmqpReceivedStamp($amqpEnvelope));
74-
$handler($envelope);
46+
try {
47+
$amqpEnvelope = $this->connection->get($queue);
48+
} catch (\AMQPException $exception) {
49+
throw new TransportException($exception->getMessage(), 0, $exception);
50+
}
51+
52+
if (null === $amqpEnvelope) {
53+
return null;
7554
}
55+
56+
try {
57+
$envelope = $this->serializer->decode([
58+
'body' => $amqpEnvelope->getBody(),
59+
'headers' => $amqpEnvelope->getHeaders(),
60+
]);
61+
} catch (MessageDecodingFailedException $exception) {
62+
// invalid message of some type
63+
$this->rejectAmqpEnvelope($amqpEnvelope);
64+
65+
throw $exception;
66+
}
67+
68+
$envelope = $envelope->with(new AmqpReceivedStamp($amqpEnvelope));
69+
70+
return $envelope;
7671
}
7772

7873
public function ack(Envelope $envelope): void

‎src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php
+2-2Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ public function __construct(Connection $connection, SerializerInterface $seriali
3737
/**
3838
* {@inheritdoc}
3939
*/
40-
public function receive(callable $handler): void
40+
public function get(string $queue = null): ?Envelope
4141
{
42-
($this->receiver ?? $this->getReceiver())->receive($handler);
42+
return ($this->receiver ?? $this->getReceiver())->get($queue);
4343
}
4444

4545
/**

‎src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php
+1-4Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,13 @@ interface ReceiverInterface
2525
/**
2626
* Receive some messages to the given handler.
2727
*
28-
* The handler will have, as argument, the received {@link \Symfony\Component\Messenger\Envelope} containing the message.
29-
* Note that this envelope can be `null` if the timeout to receive something has expired.
30-
*
3128
* If the received message cannot be decoded, the message should not
3229
* be retried again (e.g. if there's a queue, it should be removed)
3330
* and a MessageDecodingFailedException should be thrown.
3431
*
3532
* @throws TransportException If there is an issue communicating with the transport
3633
*/
37-
public function receive(callable $handler): void;
34+
public function get(string $queue = null): ?Envelope;
3835

3936
/**
4037
* Stop receiving some messages.

‎src/Symfony/Component/Messenger/Worker.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Worker.php
+12-6Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,19 +62,25 @@ public function __construct(ReceiverInterface $receiver, MessageBusInterface $bu
6262
*/
6363
public function run()
6464
{
65+
$shouldStop = false;
6566
if (\function_exists('pcntl_signal')) {
66-
pcntl_signal(SIGTERM, function () {
67-
$this->receiver->stop();
67+
pcntl_signal(SIGTERM, function () use ($shouldStop) {
68+
$shouldStop = true;
6869
});
6970
}
7071

71-
$this->receiver->receive(function (?Envelope $envelope) {
72+
while (false === $shouldStop) {
73+
$envelope = $this->receiver->get();
74+
7275
if (null === $envelope) {
7376
if (\function_exists('pcntl_signal_dispatch')) {
7477
pcntl_signal_dispatch();
7578
}
7679

77-
return;
80+
// TODO - configurable (on transport? on Worker?)
81+
usleep(200000);
82+
83+
continue;
7884
}
7985

8086
$this->dispatchEvent(new WorkerMessageReceivedEvent($envelope, $this->receiverName));
@@ -124,7 +130,7 @@ public function run()
124130
pcntl_signal_dispatch();
125131
}
126132

127-
return;
133+
continue;
128134
}
129135

130136
$this->dispatchEvent(new WorkerMessageHandledEvent($envelope, $this->receiverName));
@@ -138,7 +144,7 @@ public function run()
138144
if (\function_exists('pcntl_signal_dispatch')) {
139145
pcntl_signal_dispatch();
140146
}
141-
});
147+
};
142148
}
143149

144150
private function dispatchEvent(Event $event)

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.