Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 638d008

Browse filesBrowse files
[Messenger] allow processing messages in batches
1 parent 2d0a08b commit 638d008
Copy full SHA for 638d008

14 files changed

+754
-65
lines changed

‎src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpExtIntegrationTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpExtIntegrationTest.php
+2-1Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,8 @@ public function testItReceivesSignals()
215215
with stamps: [
216216
"Symfony\\Component\\Messenger\\Bridge\\Amqp\\Transport\\AmqpReceivedStamp",
217217
"Symfony\\Component\\Messenger\\Stamp\\ReceivedStamp",
218-
"Symfony\\Component\\Messenger\\Stamp\\ConsumedByWorkerStamp"
218+
"Symfony\\Component\\Messenger\\Stamp\\ConsumedByWorkerStamp",
219+
"Symfony\\Component\\Messenger\\Stamp\\AckStamp"
219220
]
220221
Done.
221222

‎src/Symfony/Component/Messenger/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/CHANGELOG.md
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ CHANGELOG
44
5.4
55
---
66

7+
* Add support for handling messages in batches with `BatchHandlerInterface` and corresponding trait
78
* Add `StopWorkerExceptionInterface` and its implementation `StopWorkerException` to stop the worker.
89
* Add support for resetting container services after each messenger message.
910
* Added `WorkerMetadata` class which allows you to access the configuration details of a worker, like `queueNames` and `transportNames` it consumes from.
+80Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Handler;
13+
14+
use Symfony\Component\Messenger\Exception\LogicException;
15+
16+
/**
17+
* @author Nicolas Grekas <p@tchwork.com>
18+
*/
19+
class Acknowledger
20+
{
21+
private $handlerClass;
22+
private $ack;
23+
private $error = null;
24+
private $result = null;
25+
26+
public function __construct(string $handlerClass, \Closure $ack = null)
27+
{
28+
$this->handlerClass = $handlerClass;
29+
$this->ack = $ack ?? static function () {};
30+
}
31+
32+
/**
33+
* @param mixed $result
34+
*/
35+
public function ack($result = null): void
36+
{
37+
$this->doAck(null, $result);
38+
}
39+
40+
public function nack(\Throwable $error): void
41+
{
42+
$this->doAck($error);
43+
}
44+
45+
public function getError(): ?\Throwable
46+
{
47+
return $this->error;
48+
}
49+
50+
/**
51+
* @return mixed
52+
*/
53+
public function getResult()
54+
{
55+
return $this->result;
56+
}
57+
58+
public function isAcknowledged(): bool
59+
{
60+
return null === $this->ack;
61+
}
62+
63+
public function __destruct()
64+
{
65+
if ($this->ack instanceof \Closure) {
66+
$this->doAck(new LogicException(sprintf('The acknowledger was not called by the batch handler "%s".', $this->handlerClass)));
67+
}
68+
}
69+
70+
private function doAck(\Throwable $e = null, $result = null): void
71+
{
72+
if (!$ack = $this->ack) {
73+
throw new LogicException(sprintf('The acknowledger cannot be called twice by the batch handler "%s".', $this->handlerClass));
74+
}
75+
$this->ack = null;
76+
$this->error = $e;
77+
$this->result = $result;
78+
$ack($e, $result);
79+
}
80+
}
+34Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Handler;
13+
14+
/**
15+
* @author Nicolas Grekas <p@tchwork.com>
16+
*/
17+
interface BatchHandlerInterface
18+
{
19+
/**
20+
* @param Acknowledger|null $ack The function to call to ack/nack the $message.
21+
* The message should be handled synchronously when null.
22+
*
23+
* @return mixed The number of pending messages in the batch if $ack is not null,
24+
* the result from handling the message otherwise
25+
*/
26+
//public function __invoke(object $message, Acknowledger $ack = null): mixed;
27+
28+
/**
29+
* Flushes any pending buffers.
30+
*
31+
* @param bool $force Whether flushing is required; it can be skipped if not
32+
*/
33+
public function flush(bool $force): void;
34+
}
+79Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Handler;
13+
14+
use Symfony\Component\Messenger\Exception\LogicException;
15+
16+
/**
17+
* @author Nicolas Grekas <p@tchwork.com>
18+
*/
19+
trait BatchHandlerTrait
20+
{
21+
private $jobs = [];
22+
23+
/**
24+
* {@inheritdoc}
25+
*/
26+
public function flush(bool $force): void
27+
{
28+
if ($jobs = $this->jobs) {
29+
$this->jobs = [];
30+
$this->process($jobs);
31+
}
32+
}
33+
34+
/**
35+
* @param Acknowledger|null $ack The function to call to ack/nack the $message.
36+
* The message should be handled synchronously when null.
37+
*
38+
* @return mixed The number of pending messages in the batch if $ack is not null,
39+
* the result from handling the message otherwise
40+
*/
41+
private function handle(object $message, ?Acknowledger $ack)
42+
{
43+
$result = null !== $ack ? 0 : null;
44+
$this->jobs[] = [$this->schedule($message), $ack ?? $ack = new Acknowledger(get_debug_type($this))];
45+
46+
if (null !== $result && !$this->shouldFlush()) {
47+
return \count($this->jobs);
48+
}
49+
$this->flush(true);
50+
51+
return $result ?? $ack->getResult();
52+
}
53+
54+
private function shouldFlush(): bool
55+
{
56+
return 10 <= \count($this->jobs);
57+
}
58+
59+
/**
60+
* Schedules a message for processing.
61+
*
62+
* @return mixed A value to pass to process() for batch handling
63+
*/
64+
private function schedule(object $message)
65+
{
66+
return $message;
67+
}
68+
69+
/**
70+
* Completes the jobs in the list.
71+
*
72+
* @list<array{0: mixed, 1: Acknowledger}> $jobs A list of pairs of values as returned by schedule()
73+
* and their corresponding acknowledgers
74+
*/
75+
private function process(array $jobs): void
76+
{
77+
throw new LogicException(sprintf('"%s" should implement abstract method "process()".', get_debug_type($this)));
78+
}
79+
}

‎src/Symfony/Component/Messenger/Handler/HandlerDescriptor.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Handler/HandlerDescriptor.php
+26-19Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
final class HandlerDescriptor
2020
{
2121
private $handler;
22+
private $name;
23+
private $batchHandler;
2224
private $options;
2325

2426
public function __construct(callable $handler, array $options = [])
@@ -34,7 +36,7 @@ public function getHandler(): callable
3436

3537
public function getName(): string
3638
{
37-
$name = $this->callableName($this->handler);
39+
$name = $this->name ?? $this->callableName($this->handler);
3840
$alias = $this->options['alias'] ?? null;
3941

4042
if (null !== $alias) {
@@ -44,37 +46,42 @@ public function getName(): string
4446
return $name;
4547
}
4648

49+
public function getBatchHandler(): ?BatchHandlerInterface
50+
{
51+
if (null === $this->name) {
52+
$this->callableName($this->handler);
53+
}
54+
55+
return $this->batchHandler;
56+
}
57+
4758
public function getOption(string $option)
4859
{
4960
return $this->options[$option] ?? null;
5061
}
5162

5263
private function callableName(callable $handler): string
5364
{
54-
if (\is_array($handler)) {
55-
if (\is_object($handler[0])) {
56-
return \get_class($handler[0]).'::'.$handler[1];
57-
}
58-
59-
return $handler[0].'::'.$handler[1];
65+
if (!$handler instanceof \Closure) {
66+
$handler = \Closure::fromCallable($handler);
6067
}
6168

62-
if (\is_string($handler)) {
63-
return $handler;
69+
$r = new \ReflectionFunction($handler);
70+
71+
if (str_contains($r->name, '{closure}')) {
72+
return $this->name = 'Closure';
6473
}
6574

66-
if ($handler instanceof \Closure) {
67-
$r = new \ReflectionFunction($handler);
68-
if (str_contains($r->name, '{closure}')) {
69-
return 'Closure';
70-
}
71-
if ($class = $r->getClosureScopeClass()) {
72-
return $class->name.'::'.$r->name;
73-
}
75+
if (!$handler = $r->getClosureThis()) {
76+
$class = $r->getClosureScopeClass();
77+
78+
return $this->name = ($class ? $class->name.'::' : '').$r->name;
79+
}
7480

75-
return $r->name;
81+
if ($handler instanceof BatchHandlerInterface) {
82+
$this->batchHandler = $handler;
7683
}
7784

78-
return \get_class($handler).'::__invoke';
85+
return $this->name = \get_class($handler).'::'.$r->name;
7986
}
8087
}

‎src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php
+56-6Lines changed: 56 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,15 @@
1515
use Psr\Log\NullLogger;
1616
use Symfony\Component\Messenger\Envelope;
1717
use Symfony\Component\Messenger\Exception\HandlerFailedException;
18+
use Symfony\Component\Messenger\Exception\LogicException;
1819
use Symfony\Component\Messenger\Exception\NoHandlerForMessageException;
20+
use Symfony\Component\Messenger\Handler\Acknowledger;
1921
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
2022
use Symfony\Component\Messenger\Handler\HandlersLocatorInterface;
23+
use Symfony\Component\Messenger\Stamp\AckStamp;
24+
use Symfony\Component\Messenger\Stamp\FlushBatchHandlersStamp;
2125
use Symfony\Component\Messenger\Stamp\HandledStamp;
26+
use Symfony\Component\Messenger\Stamp\NoAutoAckStamp;
2227

2328
/**
2429
* @author Samuel Roze <samuel.roze@gmail.com>
@@ -60,14 +65,58 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
6065

6166
try {
6267
$handler = $handlerDescriptor->getHandler();
63-
$handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $handler($message));
68+
$batchHandler = $handlerDescriptor->getBatchHandler();
69+
$ack = null;
70+
71+
if ($batchHandler && $ackStamp = $envelope->last(AckStamp::class)) {
72+
$ack = new Acknowledger(get_debug_type($batchHandler), static function (\Throwable $e = null, $result = null) use ($envelope, $ackStamp, $handlerDescriptor) {
73+
if (null !== $e) {
74+
$e = new HandlerFailedException($envelope, [$e]);
75+
} else {
76+
$envelope = $envelope->with(HandledStamp::fromDescriptor($handlerDescriptor, $result));
77+
}
78+
79+
$ackStamp->ack($envelope, $e);
80+
});
81+
}
82+
83+
if (null === $ack) {
84+
$handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $handler($message));
85+
} else {
86+
$batchSize = $handler($message, $ack);
87+
88+
if (!\is_int($batchSize) || 0 > $batchSize) {
89+
throw new LogicException(sprintf('A handler implementing BatchHandlerInterface must return the size of the current batch as a positive integer, "%s" returned from "%s".', \is_int($batchSize) ? $batchSize : get_debug_type($batchSize), get_debug_type($batchHandler)));
90+
}
91+
92+
if (!$ack->isAcknowledged()) {
93+
$envelope = $envelope->with(new NoAutoAckStamp($handlerDescriptor));
94+
$handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $batchSize);
95+
} elseif ($ack->getError()) {
96+
throw $ack->getError();
97+
} else {
98+
$handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $ack->getResult());
99+
}
100+
}
101+
64102
$envelope = $envelope->with($handledStamp);
65103
$this->logger->info('Message {class} handled by {handler}', $context + ['handler' => $handledStamp->getHandlerName()]);
66104
} catch (\Throwable $e) {
67105
$exceptions[] = $e;
68106
}
69107
}
70108

109+
if ($flushStamp = $envelope->last(FlushBatchHandlersStamp::class)) {
110+
foreach ($envelope->all(NoAutoAckStamp::class) as $stamp) {
111+
try {
112+
$handler = $stamp->getHandlerDescriptor()->getBatchHandler();
113+
$handler->flush($flushStamp->force());
114+
} catch (\Throwable $e) {
115+
$exceptions[] = $e;
116+
}
117+
}
118+
}
119+
71120
if (null === $handler) {
72121
if (!$this->allowNoHandlers) {
73122
throw new NoHandlerForMessageException(sprintf('No handler for message "%s".', $context['class']));
@@ -85,11 +134,12 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
85134

86135
private function messageHasAlreadyBeenHandled(Envelope $envelope, HandlerDescriptor $handlerDescriptor): bool
87136
{
88-
$some = array_filter($envelope
89-
->all(HandledStamp::class), function (HandledStamp $stamp) use ($handlerDescriptor) {
90-
return $stamp->getHandlerName() === $handlerDescriptor->getName();
91-
});
137+
foreach ($envelope->all(HandledStamp::class) as $stamp) {
138+
if ($stamp->getHandlerName() === $handlerDescriptor->getName()) {
139+
return true;
140+
}
141+
}
92142

93-
return \count($some) > 0;
143+
return false;
94144
}
95145
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.