Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 9cf1bc7

Browse filesBrowse files
[Messenger] allow processing messages in batches
1 parent 8523047 commit 9cf1bc7
Copy full SHA for 9cf1bc7

12 files changed

+570
-65
lines changed

‎src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpExtIntegrationTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpExtIntegrationTest.php
+2-1Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,8 @@ public function testItReceivesSignals()
215215
with stamps: [
216216
"Symfony\\Component\\Messenger\\Bridge\\Amqp\\Transport\\AmqpReceivedStamp",
217217
"Symfony\\Component\\Messenger\\Stamp\\ReceivedStamp",
218-
"Symfony\\Component\\Messenger\\Stamp\\ConsumedByWorkerStamp"
218+
"Symfony\\Component\\Messenger\\Stamp\\ConsumedByWorkerStamp",
219+
"Symfony\\Component\\Messenger\\Stamp\\AckStamp"
219220
]
220221
Done.
221222

+80Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Handler;
13+
14+
use Symfony\Component\Messenger\Exception\LogicException;
15+
16+
/**
17+
* @author Nicolas Grekas <p@tchwork.com>
18+
*/
19+
class Acknowledger
20+
{
21+
private $handlerClass;
22+
private $ack;
23+
private $error = null;
24+
private $result = null;
25+
26+
public function __construct(string $handlerClass, \Closure $ack = null)
27+
{
28+
$this->handlerClass = $handlerClass;
29+
$this->ack = $ack ?? static function () {};
30+
}
31+
32+
/**
33+
* @param mixed $result
34+
*/
35+
public function ack($result = null): void
36+
{
37+
$this->doAck(null, $result);
38+
}
39+
40+
public function nack(\Throwable $error): void
41+
{
42+
$this->doAck($error);
43+
}
44+
45+
public function getError(): ?\Throwable
46+
{
47+
return $this->error;
48+
}
49+
50+
/**
51+
* @return mixed
52+
*/
53+
public function getResult()
54+
{
55+
return $this->result;
56+
}
57+
58+
public function isAcknowledged(): bool
59+
{
60+
return null === $this->ack;
61+
}
62+
63+
public function __destruct()
64+
{
65+
if ($this->ack instanceof \Closure) {
66+
$this->doAck(new LogicException(sprintf('The ack function was not called by the batch handler "%s".', $this->handlerClass)));
67+
}
68+
}
69+
70+
private function doAck(\Throwable $e = null, $result = null): void
71+
{
72+
if (!$ack = $this->ack) {
73+
throw new LogicException(sprintf('The ack function cannot be called twice by the batch handler "%s".', $this->handlerClass));
74+
}
75+
$this->ack = null;
76+
$this->error = $e;
77+
$this->result = $result;
78+
$ack($e, $result);
79+
}
80+
}
+27Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Handler;
13+
14+
/**
15+
* @author Nicolas Grekas <p@tchwork.com>
16+
*/
17+
interface BatchHandlerInterface
18+
{
19+
/*
20+
* @param Acknowledger|null $ack The function to call to ack/nack the $message.
21+
* The message should be handled synchronously when null.
22+
*
23+
* @return mixed The number of pending messages in the batch if $ack is not null,
24+
* the result from handling the message otherwise
25+
*/
26+
//public function __invoke(object $message, Acknowledger $ack = null): mixed;
27+
}
+71Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Handler;
13+
14+
use Symfony\Component\Messenger\Exception\LogicException;
15+
16+
/**
17+
* @author Nicolas Grekas <p@tchwork.com>
18+
*/
19+
trait BatchHandlerTrait
20+
{
21+
private $jobs = [];
22+
23+
/**
24+
* @param Acknowledger|null $ack The function to call to ack/nack the $message.
25+
* The message should be handled synchronously when null.
26+
*
27+
* @return int The number of pending messages in the batch if $ack is not null,
28+
* the result from handling the message otherwise
29+
*/
30+
private function handle(object $message, ?Acknowledger $ack)
31+
{
32+
$result = null !== $ack ? 0 : null;
33+
$this->jobs[] = [$this->schedule($message), $ack ?? $ack = new Acknowledger(get_debug_type($this))];
34+
35+
if (null !== $result && !$this->shouldProcess()) {
36+
return \count($this->jobs);
37+
}
38+
39+
$jobs = $this->jobs;
40+
$this->jobs = [];
41+
$this->process($jobs);
42+
43+
return $result ?? $ack->getResult();
44+
}
45+
46+
private function shouldProcess(): bool
47+
{
48+
return 10 <= \count($this->jobs);
49+
}
50+
51+
/**
52+
* Schedules a message for processing.
53+
*
54+
* @return mixed A value to pass to process() for batch handling
55+
*/
56+
private function schedule(object $message)
57+
{
58+
return $message;
59+
}
60+
61+
/**
62+
* Completes the jobs in the list.
63+
*
64+
* @list<array{0: mixed, 1: Acknowledger}> $jobs A list of pairs of values as returned by schedule()
65+
* and their corresponding acknowledgers
66+
*/
67+
private function process(array $jobs): void
68+
{
69+
throw new LogicException(sprintf('"%s" should implement abstract method "process()".', get_debug_type($this)));
70+
}
71+
}

‎src/Symfony/Component/Messenger/Handler/HandlerDescriptor.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Handler/HandlerDescriptor.php
+26-19Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
final class HandlerDescriptor
2020
{
2121
private $handler;
22+
private $name;
23+
private $batchHandler;
2224
private $options;
2325

2426
public function __construct(callable $handler, array $options = [])
@@ -34,7 +36,7 @@ public function getHandler(): callable
3436

3537
public function getName(): string
3638
{
37-
$name = $this->callableName($this->handler);
39+
$name = $this->name ?? $this->callableName($this->handler);
3840
$alias = $this->options['alias'] ?? null;
3941

4042
if (null !== $alias) {
@@ -44,37 +46,42 @@ public function getName(): string
4446
return $name;
4547
}
4648

49+
public function getBatchHandler(): ?BatchHandlerInterface
50+
{
51+
if (null === $this->name) {
52+
$this->callableName($this->handler);
53+
}
54+
55+
return $this->batchHandler;
56+
}
57+
4758
public function getOption(string $option)
4859
{
4960
return $this->options[$option] ?? null;
5061
}
5162

5263
private function callableName(callable $handler): string
5364
{
54-
if (\is_array($handler)) {
55-
if (\is_object($handler[0])) {
56-
return \get_class($handler[0]).'::'.$handler[1];
57-
}
58-
59-
return $handler[0].'::'.$handler[1];
65+
if (!$handler instanceof \Closure) {
66+
$handler = \Closure::fromCallable($handler);
6067
}
6168

62-
if (\is_string($handler)) {
63-
return $handler;
69+
$r = new \ReflectionFunction($handler);
70+
71+
if (str_contains($r->name, '{closure}')) {
72+
return $this->name = 'Closure';
6473
}
6574

66-
if ($handler instanceof \Closure) {
67-
$r = new \ReflectionFunction($handler);
68-
if (str_contains($r->name, '{closure}')) {
69-
return 'Closure';
70-
}
71-
if ($class = $r->getClosureScopeClass()) {
72-
return $class->name.'::'.$r->name;
73-
}
75+
if (!$handler = $r->getClosureThis()) {
76+
$class = $r->getClosureScopeClass();
77+
78+
return $this->name = ($class ? $class->name.'::' : '').$r->name;
79+
}
7480

75-
return $r->name;
81+
if ($handler instanceof BatchHandlerInterface) {
82+
$this->batchHandler = $handler;
7683
}
7784

78-
return \get_class($handler).'::__invoke';
85+
return $this->name = \get_class($handler).'::'.$r->name;
7986
}
8087
}

‎src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php
+38-1Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,14 @@
1515
use Psr\Log\NullLogger;
1616
use Symfony\Component\Messenger\Envelope;
1717
use Symfony\Component\Messenger\Exception\HandlerFailedException;
18+
use Symfony\Component\Messenger\Exception\LogicException;
1819
use Symfony\Component\Messenger\Exception\NoHandlerForMessageException;
20+
use Symfony\Component\Messenger\Handler\Acknowledger;
1921
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
2022
use Symfony\Component\Messenger\Handler\HandlersLocatorInterface;
23+
use Symfony\Component\Messenger\Stamp\AckStamp;
2124
use Symfony\Component\Messenger\Stamp\HandledStamp;
25+
use Symfony\Component\Messenger\Stamp\NoAutoAckStamp;
2226

2327
/**
2428
* @author Samuel Roze <samuel.roze@gmail.com>
@@ -60,7 +64,40 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
6064

6165
try {
6266
$handler = $handlerDescriptor->getHandler();
63-
$handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $handler($message));
67+
$batchHandler = $handlerDescriptor->getBatchHandler();
68+
$ack = null;
69+
70+
if ($batchHandler && $ackStamp = $envelope->last(AckStamp::class)) {
71+
$ack = new Acknowledger(get_debug_type($batchHandler), static function (\Throwable $e = null, $result = null) use ($envelope, $ackStamp, $handlerDescriptor) {
72+
if (null !== $e) {
73+
$e = new HandlerFailedException($envelope, [$e]);
74+
} else {
75+
$envelope = $envelope->with(HandledStamp::fromDescriptor($handlerDescriptor, $result));
76+
}
77+
78+
$ackStamp->ack($envelope, $e);
79+
});
80+
}
81+
82+
if (null === $ack) {
83+
$handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $handler($message));
84+
} else {
85+
$batchSize = $handler($message, $ack);
86+
87+
if (!\is_int($batchSize) || 0 > $batchSize) {
88+
throw new LogicException(sprintf('A handler implementing BatchHandlerInterface must return the size of the current batch as a positive integer, "%s" returned from "%s".', \is_int($batchSize) ? $batchSize : get_debug_type($batchSize), get_debug_type($batchHandler)));
89+
}
90+
91+
if (!$ack->isAcknowledged()) {
92+
$envelope = $envelope->with(new NoAutoAckStamp());
93+
$handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $batchSize);
94+
} elseif ($ack->getError()) {
95+
throw $ack->getError();
96+
} else {
97+
$handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $ack->getResult());
98+
}
99+
}
100+
64101
$envelope = $envelope->with($handledStamp);
65102
$this->logger->info('Message {class} handled by {handler}', $context + ['handler' => $handledStamp->getHandlerName()]);
66103
} catch (\Throwable $e) {
+32Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Stamp;
13+
14+
use Symfony\Component\Messenger\Envelope;
15+
16+
/**
17+
* Marker stamp for messages that can be ack/nack'ed.
18+
*/
19+
final class AckStamp implements NonSendableStampInterface
20+
{
21+
private $ack;
22+
23+
public function __construct(\Closure $ack)
24+
{
25+
$this->ack = $ack;
26+
}
27+
28+
public function ack(Envelope $envelope, \Throwable $e = null): void
29+
{
30+
($this->ack)($envelope, $e);
31+
}
32+
}
+19Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Stamp;
13+
14+
/**
15+
* A marker that ack'ing for this message should not be done automatically.
16+
*/
17+
final class NoAutoAckStamp implements NonSendableStampInterface
18+
{
19+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.