Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit effa72f

Browse filesBrowse files
[Messenger] allow processing messages in batches
1 parent ab7f816 commit effa72f
Copy full SHA for effa72f

File tree

8 files changed

+189
-62
lines changed
Filter options

8 files changed

+189
-62
lines changed
+32Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Handler;
13+
14+
/**
15+
* @author Nicolas Grekas <p@tchwork.com>
16+
*/
17+
interface BatchHandlerInterface
18+
{
19+
/**
20+
* @param \Closure(\Throwable=, mixed=)|null $ack The closure to call in handleBatch() to ack/nack the $message.
21+
* The message should be handled synchronously when null.
22+
*
23+
* @return int The number of pending messages in the batch if $ack was provided,
24+
* the result from handling the message otherwise
25+
*/
26+
//public function __invoke(object $message, \Closure $ack = null): int|mixed;
27+
28+
/**
29+
* Handles messages buffered after successive calls to __invoke().
30+
*/
31+
public function handleBatch();
32+
}

‎src/Symfony/Component/Messenger/Handler/HandlerDescriptor.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Handler/HandlerDescriptor.php
+26-19Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
final class HandlerDescriptor
2020
{
2121
private $handler;
22+
private $name;
23+
private $batchHandler;
2224
private $options;
2325

2426
public function __construct(callable $handler, array $options = [])
@@ -34,7 +36,7 @@ public function getHandler(): callable
3436

3537
public function getName(): string
3638
{
37-
$name = $this->callableName($this->handler);
39+
$name = $this->name ?? $this->callableName($this->handler);
3840
$alias = $this->options['alias'] ?? null;
3941

4042
if (null !== $alias) {
@@ -44,37 +46,42 @@ public function getName(): string
4446
return $name;
4547
}
4648

49+
public function getBatchHandler(): ?BatchHandlerInterface
50+
{
51+
if (null === $this->name) {
52+
$this->callableName($this->handler);
53+
}
54+
55+
return $this->batchHandler;
56+
}
57+
4758
public function getOption(string $option)
4859
{
4960
return $this->options[$option] ?? null;
5061
}
5162

5263
private function callableName(callable $handler): string
5364
{
54-
if (\is_array($handler)) {
55-
if (\is_object($handler[0])) {
56-
return \get_class($handler[0]).'::'.$handler[1];
57-
}
58-
59-
return $handler[0].'::'.$handler[1];
65+
if (!$handler instanceof \Closure) {
66+
$handler = \Closure::fromCallable($handler);
6067
}
6168

62-
if (\is_string($handler)) {
63-
return $handler;
69+
$r = new \ReflectionFunction($handler);
70+
71+
if (str_contains($r->name, '{closure}')) {
72+
return $this->name = 'Closure';
6473
}
6574

66-
if ($handler instanceof \Closure) {
67-
$r = new \ReflectionFunction($handler);
68-
if (str_contains($r->name, '{closure}')) {
69-
return 'Closure';
70-
}
71-
if ($class = $r->getClosureScopeClass()) {
72-
return $class->name.'::'.$r->name;
73-
}
75+
if (!$handler = $r->getClosureThis()) {
76+
$class = $r->getClosureScopeClass();
77+
78+
return $this->name = ($class ? $class->name.'::' : '').$r->name;
79+
}
7480

75-
return $r->name;
81+
if ($handler instanceof BatchHandlerInterface) {
82+
$this->batchHandler = $handler;
7683
}
7784

78-
return \get_class($handler).'::__invoke';
85+
return $this->name = \get_class($handler).'::'.$r->name;
7986
}
8087
}

‎src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php
+39-1Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,13 @@
1515
use Psr\Log\NullLogger;
1616
use Symfony\Component\Messenger\Envelope;
1717
use Symfony\Component\Messenger\Exception\HandlerFailedException;
18+
use Symfony\Component\Messenger\Exception\LogicException;
1819
use Symfony\Component\Messenger\Exception\NoHandlerForMessageException;
1920
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
2021
use Symfony\Component\Messenger\Handler\HandlersLocatorInterface;
2122
use Symfony\Component\Messenger\Stamp\HandledStamp;
23+
use Symfony\Component\Messenger\Stamp\NoAutoAckStamp;
24+
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
2225

2326
/**
2427
* @author Samuel Roze <samuel.roze@gmail.com>
@@ -60,9 +63,44 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
6063

6164
try {
6265
$handler = $handlerDescriptor->getHandler();
63-
$handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $handler($message));
66+
$batchHandler = $handlerDescriptor->getBatchHandler();
67+
$acked = false;
68+
$ack = null;
69+
70+
if ($batchHandler && $receivedStamp = $envelope->last(ReceivedStamp::class)) {
71+
$ack = function (\Throwable $e = null, mixed $result = null) use ($envelope, $receivedStamp, $handlerDescriptor, &$acked) {
72+
$acked = true;
73+
if (null === $e) {
74+
$envelope = $envelope->with(HandledStamp::fromDescriptor($handlerDescriptor, $result));
75+
}
76+
$receivedStamp->ack($envelope, $e);
77+
};
78+
}
79+
80+
if (null === $ack) {
81+
$handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $handler($message));
82+
} else {
83+
$handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $handler($message, $ack));
84+
85+
if ($acked) {
86+
$ack = null;
87+
} else {
88+
$envelope = $envelope->with(new NoAutoAckStamp());
89+
}
90+
}
91+
6492
$envelope = $envelope->with($handledStamp);
6593
$this->logger->info('Message {class} handled by {handler}', $context + ['handler' => $handledStamp->getHandlerName()]);
94+
95+
if (null !== $ack) {
96+
if (!\is_int($batchSize = $handledStamp->getResult()) || 0 >= $batchSize) {
97+
throw new LogicException(sprintf('A handler implementing BatchHandlerInterface must return the size of the current batch as a positive integer, "%s" returned from "%s".', \is_int($batchSize) ? $batchSize : get_debug_type($batchSize), get_debut_type(batchHandler)));
98+
}
99+
100+
if ($batchSize >= ($handlerDescriptor->getOption('batch_size') ?? 0)) {
101+
$batchHandler->handleBatch();
102+
}
103+
}
66104
} catch (\Throwable $e) {
67105
$exceptions[] = $e;
68106
}
+19Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Stamp;
13+
14+
/**
15+
* A marker that ack'ing for this message should not be done automatically.
16+
*/
17+
class NoAutoAckStamp implements NonSendableStampInterface
18+
{
19+
}

‎src/Symfony/Component/Messenger/Stamp/ReceivedStamp.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Stamp/ReceivedStamp.php
+12-1Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,25 @@
2626
final class ReceivedStamp implements NonSendableStampInterface
2727
{
2828
private $transportName;
29+
private $ack;
2930

30-
public function __construct(string $transportName)
31+
public function __construct(string $transportName, \Closure $ack = null)
3132
{
3233
$this->transportName = $transportName;
34+
$this->ack = $ack;
3335
}
3436

3537
public function getTransportName(): string
3638
{
3739
return $this->transportName;
3840
}
41+
42+
public function ack(Envelope $envelope, \Throwable $e = null): void
43+
{
44+
if ($this->ack) {
45+
($this->ack)($envelope, $e);
46+
} elseif ($e) {
47+
throw $e;
48+
}
49+
}
3950
}

‎src/Symfony/Component/Messenger/Tests/Handler/HandlersLocatorTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Handler/HandlersLocatorTest.php
+7-1Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@ public function testItYieldsHandlerDescriptors()
2727
DummyMessage::class => [$handler],
2828
]);
2929

30-
$this->assertEquals([new HandlerDescriptor($handler)], iterator_to_array($locator->getHandlers(new Envelope(new DummyMessage('a')))));
30+
$descriptor = new HandlerDescriptor($handler);
31+
$descriptor->getName();
32+
33+
$this->assertEquals([$descriptor], iterator_to_array($locator->getHandlers(new Envelope(new DummyMessage('a')))));
3134
}
3235

3336
public function testItReturnsOnlyHandlersMatchingTransport()
@@ -43,6 +46,9 @@ public function testItReturnsOnlyHandlersMatchingTransport()
4346
],
4447
]);
4548

49+
$first->getName();
50+
$second->getName();
51+
4652
$this->assertEquals([
4753
$first,
4854
$second,

‎src/Symfony/Component/Messenger/Tests/WorkerTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/WorkerTest.php
+10-8Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,24 +48,26 @@ public function testWorkerDispatchTheReceivedMessage()
4848
]);
4949

5050
$bus = $this->createMock(MessageBusInterface::class);
51+
$envelopes = [];
5152

5253
$bus->expects($this->exactly(2))
5354
->method('dispatch')
54-
->withConsecutive(
55-
[new Envelope($apiMessage, [new ReceivedStamp('transport'), new ConsumedByWorkerStamp()])],
56-
[new Envelope($ipaMessage, [new ReceivedStamp('transport'), new ConsumedByWorkerStamp()])]
57-
)
58-
->willReturnOnConsecutiveCalls(
59-
$this->returnArgument(0),
60-
$this->returnArgument(0)
61-
);
55+
->willReturnCallback(function ($envelope) use (&$envelopes) {
56+
return $envelopes[] = $envelope;
57+
});
6258

6359
$dispatcher = new EventDispatcher();
6460
$dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(2));
6561

6662
$worker = new Worker(['transport' => $receiver], $bus, $dispatcher);
6763
$worker->run();
6864

65+
$this->assertSame($apiMessage, $envelopes[0]->getMessage());
66+
$this->assertSame($ipaMessage, $envelopes[1]->getMessage());
67+
$this->assertCount(1, $envelopes[0]->all(ReceivedStamp::class));
68+
$this->assertCount(1, $envelopes[0]->all(ConsumedByWorkerStamp::class));
69+
$this->assertSame('transport', $envelopes[0]->last(ReceivedStamp::class)->getTransportName());
70+
6971
$this->assertSame(2, $receiver->getAcknowledgeCount());
7072
}
7173

‎src/Symfony/Component/Messenger/Worker.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Worker.php
+44-32Lines changed: 44 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -133,45 +133,57 @@ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver,
133133
return;
134134
}
135135

136-
try {
137-
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName), new ConsumedByWorkerStamp()));
138-
} catch (\Throwable $throwable) {
139-
$rejectFirst = $throwable instanceof RejectRedeliveredMessageException;
140-
if ($rejectFirst) {
141-
// redelivered messages are rejected first so that continuous failures in an event listener or while
142-
// publishing for retry does not cause infinite redelivery loops
143-
$receiver->reject($envelope);
144-
}
136+
$acked = false;
137+
$ack = function (Envelope $envelope, \Throwable $e = null) use ($receiver, $transportName, &$acked) {
138+
$acked = true;
139+
140+
if ($e) {
141+
if ($rejectFirst = $e instanceof RejectRedeliveredMessageException) {
142+
// redelivered messages are rejected first so that continuous failures in an event listener or while
143+
// publishing for retry does not cause infinite redelivery loops
144+
$receiver->reject($envelope);
145+
}
145146

146-
if ($throwable instanceof HandlerFailedException) {
147-
$envelope = $throwable->getEnvelope();
148-
}
147+
if ($e instanceof HandlerFailedException) {
148+
$envelope = $e->getEnvelope();
149+
}
150+
151+
$failedEvent = new WorkerMessageFailedEvent($envelope, $transportName, $e);
152+
$this->dispatchEvent($failedEvent);
153+
$envelope = $failedEvent->getEnvelope();
149154

150-
$failedEvent = new WorkerMessageFailedEvent($envelope, $transportName, $throwable);
151-
$this->dispatchEvent($failedEvent);
152-
$envelope = $failedEvent->getEnvelope();
155+
if (!$rejectFirst) {
156+
$receiver->reject($envelope);
157+
}
153158

154-
if (!$rejectFirst) {
155-
$receiver->reject($envelope);
159+
return;
156160
}
157161

158-
return;
159-
}
162+
$handledEvent = new WorkerMessageHandledEvent($envelope, $transportName);
163+
$this->dispatchEvent($handledEvent);
164+
$envelope = $handledEvent->getEnvelope();
165+
166+
if (null !== $this->logger) {
167+
$message = $envelope->getMessage();
168+
$context = [
169+
'message' => $message,
170+
'class' => \get_class($message),
171+
];
172+
$this->logger->info('{class} was handled successfully (acknowledging to transport).', $context);
173+
}
160174

161-
$handledEvent = new WorkerMessageHandledEvent($envelope, $transportName);
162-
$this->dispatchEvent($handledEvent);
163-
$envelope = $handledEvent->getEnvelope();
164-
165-
if (null !== $this->logger) {
166-
$message = $envelope->getMessage();
167-
$context = [
168-
'message' => $message,
169-
'class' => \get_class($message),
170-
];
171-
$this->logger->info('{class} was handled successfully (acknowledging to transport).', $context);
172-
}
175+
$receiver->ack($envelope);
176+
};
173177

174-
$receiver->ack($envelope);
178+
try {
179+
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName, $ack), new ConsumedByWorkerStamp()));
180+
181+
if (!$acked && !$envelope->last(NoAutoAckStamp::class)) {
182+
$ack($envelope);
183+
}
184+
} catch (\Throwable $e) {
185+
$ack($envelope, $e);
186+
}
175187
}
176188

177189
public function stop(): void

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.