Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit ce13bc6

Browse filesBrowse files
[Messenger] allow processing messages in batches
1 parent b2fc70b commit ce13bc6
Copy full SHA for ce13bc6

14 files changed

+765
-75
lines changed

‎src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpExtIntegrationTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpExtIntegrationTest.php
+2-1Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,8 @@ public function testItReceivesSignals()
215215
with stamps: [
216216
"Symfony\\Component\\Messenger\\Bridge\\Amqp\\Transport\\AmqpReceivedStamp",
217217
"Symfony\\Component\\Messenger\\Stamp\\ReceivedStamp",
218-
"Symfony\\Component\\Messenger\\Stamp\\ConsumedByWorkerStamp"
218+
"Symfony\\Component\\Messenger\\Stamp\\ConsumedByWorkerStamp",
219+
"Symfony\\Component\\Messenger\\Stamp\\AckStamp"
219220
]
220221
Done.
221222

‎src/Symfony/Component/Messenger/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/CHANGELOG.md
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ CHANGELOG
55
---
66

77
* Add `AsMessageHandler` attribute for declaring message handlers on PHP 8.
8+
* Add support for handling messages in batches with `BatchHandlerInterface` and corresponding trait
89
* Add `StopWorkerExceptionInterface` and its implementation `StopWorkerException` to stop the worker.
910
* Add support for resetting container services after each messenger message.
1011
* Added `WorkerMetadata` class which allows you to access the configuration details of a worker, like `queueNames` and `transportNames` it consumes from.
+83Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Handler;
13+
14+
use Symfony\Component\Messenger\Exception\LogicException;
15+
16+
/**
17+
* @author Nicolas Grekas <p@tchwork.com>
18+
*/
19+
class Acknowledger
20+
{
21+
private $handlerClass;
22+
private $ack;
23+
private $error = null;
24+
private $result = null;
25+
26+
/**
27+
* @param null|\Closure(\Throwable|null, mixed):void $ack
28+
*/
29+
public function __construct(string $handlerClass, \Closure $ack = null)
30+
{
31+
$this->handlerClass = $handlerClass;
32+
$this->ack = $ack ?? static function () {};
33+
}
34+
35+
/**
36+
* @param mixed $result
37+
*/
38+
public function ack($result = null): void
39+
{
40+
$this->doAck(null, $result);
41+
}
42+
43+
public function nack(\Throwable $error): void
44+
{
45+
$this->doAck($error);
46+
}
47+
48+
public function getError(): ?\Throwable
49+
{
50+
return $this->error;
51+
}
52+
53+
/**
54+
* @return mixed
55+
*/
56+
public function getResult()
57+
{
58+
return $this->result;
59+
}
60+
61+
public function isAcknowledged(): bool
62+
{
63+
return null === $this->ack;
64+
}
65+
66+
public function __destruct()
67+
{
68+
if ($this->ack instanceof \Closure) {
69+
throw new LogicException(sprintf('The acknowledger was not called by batch handler "%s".', $this->handlerClass));
70+
}
71+
}
72+
73+
private function doAck(\Throwable $e = null, $result = null): void
74+
{
75+
if (!$ack = $this->ack) {
76+
throw new LogicException(sprintf('The acknowledger cannot be called twice by batch handler "%s".', $this->handlerClass));
77+
}
78+
$this->ack = null;
79+
$this->error = $e;
80+
$this->result = $result;
81+
$ack($e, $result);
82+
}
83+
}
+34Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Handler;
13+
14+
/**
15+
* @author Nicolas Grekas <p@tchwork.com>
16+
*/
17+
interface BatchHandlerInterface
18+
{
19+
/**
20+
* @param Acknowledger|null $ack The function to call to ack/nack the $message.
21+
* The message should be handled synchronously when null.
22+
*
23+
* @return mixed The number of pending messages in the batch if $ack is not null,
24+
* the result from handling the message otherwise
25+
*/
26+
//public function __invoke(object $message, Acknowledger $ack = null): mixed;
27+
28+
/**
29+
* Flushes any pending buffers.
30+
*
31+
* @param bool $force Whether flushing is required; it can be skipped if not
32+
*/
33+
public function flush(bool $force): void;
34+
}
+86Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Handler;
13+
14+
use Symfony\Component\Messenger\Exception\LogicException;
15+
16+
/**
17+
* @author Nicolas Grekas <p@tchwork.com>
18+
*/
19+
trait BatchHandlerTrait
20+
{
21+
private $jobs = [];
22+
23+
/**
24+
* {@inheritdoc}
25+
*/
26+
public function flush(bool $force): void
27+
{
28+
if ($jobs = $this->jobs) {
29+
$this->jobs = [];
30+
$this->process($jobs);
31+
}
32+
}
33+
34+
/**
35+
* @param Acknowledger|null $ack The function to call to ack/nack the $message.
36+
* The message should be handled synchronously when null.
37+
*
38+
* @return mixed The number of pending messages in the batch if $ack is not null,
39+
* the result from handling the message otherwise
40+
*/
41+
private function handle(object $message, ?Acknowledger $ack)
42+
{
43+
if (null === $ack) {
44+
$ack = new Acknowledger(get_debug_type($this));
45+
$this->jobs[] = [$this->schedule($message), $ack];
46+
$this->flush(true);
47+
48+
return $ack->getResult();
49+
}
50+
51+
$this->jobs[] = [$this->schedule($message), $ack];
52+
if (!$this->shouldFlush()) {
53+
return \count($this->jobs);
54+
}
55+
56+
$this->flush(true);
57+
58+
return 0;
59+
}
60+
61+
private function shouldFlush(): bool
62+
{
63+
return 10 <= \count($this->jobs);
64+
}
65+
66+
/**
67+
* Schedules a message for processing.
68+
*
69+
* @return mixed A value to pass to process() for batch handling
70+
*/
71+
private function schedule(object $message)
72+
{
73+
return $message;
74+
}
75+
76+
/**
77+
* Completes the jobs in the list.
78+
*
79+
* @list<array{0: mixed, 1: Acknowledger}> $jobs A list of pairs of values as returned by schedule()
80+
* and their corresponding acknowledgers
81+
*/
82+
private function process(array $jobs): void
83+
{
84+
throw new LogicException(sprintf('"%s" should implement abstract method "process()".', get_debug_type($this)));
85+
}
86+
}

‎src/Symfony/Component/Messenger/Handler/HandlerDescriptor.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Handler/HandlerDescriptor.php
+27-29Lines changed: 27 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,34 @@
1919
final class HandlerDescriptor
2020
{
2121
private $handler;
22+
private $name;
23+
private $batchHandler;
2224
private $options;
2325

2426
public function __construct(callable $handler, array $options = [])
2527
{
28+
if (!$handler instanceof \Closure) {
29+
$handler = \Closure::fromCallable($handler);
30+
}
31+
2632
$this->handler = $handler;
2733
$this->options = $options;
34+
35+
$r = new \ReflectionFunction($handler);
36+
37+
if (str_contains($r->name, '{closure}')) {
38+
$this->name = 'Closure';
39+
} elseif (!$handler = $r->getClosureThis()) {
40+
$class = $r->getClosureScopeClass();
41+
42+
$this->name = ($class ? $class->name.'::' : '').$r->name;
43+
} else {
44+
if ($handler instanceof BatchHandlerInterface) {
45+
$this->batchHandler = $handler;
46+
}
47+
48+
$this->name = \get_class($handler).'::'.$r->name;
49+
}
2850
}
2951

3052
public function getHandler(): callable
@@ -34,7 +56,7 @@ public function getHandler(): callable
3456

3557
public function getName(): string
3658
{
37-
$name = $this->callableName($this->handler);
59+
$name = $this->name;
3860
$alias = $this->options['alias'] ?? null;
3961

4062
if (null !== $alias) {
@@ -44,37 +66,13 @@ public function getName(): string
4466
return $name;
4567
}
4668

47-
public function getOption(string $option)
69+
public function getBatchHandler(): ?BatchHandlerInterface
4870
{
49-
return $this->options[$option] ?? null;
71+
return $this->batchHandler;
5072
}
5173

52-
private function callableName(callable $handler): string
74+
public function getOption(string $option)
5375
{
54-
if (\is_array($handler)) {
55-
if (\is_object($handler[0])) {
56-
return \get_class($handler[0]).'::'.$handler[1];
57-
}
58-
59-
return $handler[0].'::'.$handler[1];
60-
}
61-
62-
if (\is_string($handler)) {
63-
return $handler;
64-
}
65-
66-
if ($handler instanceof \Closure) {
67-
$r = new \ReflectionFunction($handler);
68-
if (str_contains($r->name, '{closure}')) {
69-
return 'Closure';
70-
}
71-
if ($class = $r->getClosureScopeClass()) {
72-
return $class->name.'::'.$r->name;
73-
}
74-
75-
return $r->name;
76-
}
77-
78-
return \get_class($handler).'::__invoke';
76+
return $this->options[$option] ?? null;
7977
}
8078
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.