Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 8fac51d

Browse filesBrowse files
committed
[Messenger] implementation of messenger:consume, which processes messages concurrently
1 parent 82acd7a commit 8fac51d
Copy full SHA for 8fac51d

File tree

10 files changed

+263
-10
lines changed
Filter options

10 files changed

+263
-10
lines changed

‎src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2158,7 +2158,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
21582158

21592159
if ($busId === $config['default_bus']) {
21602160
$container->setAlias('messenger.default_bus', $busId)->setPublic(true);
2161-
$container->setAlias(MessageBusInterface::class, $busId);
2161+
$container->setAlias(MessageBusInterface::class, $busId)->setPublic(true);
21622162
} else {
21632163
$container->registerAliasForArgument($busId, MessageBusInterface::class);
21642164
}

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php
+11Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
3434
use Symfony\Component\Messenger\Middleware\TraceableMiddleware;
3535
use Symfony\Component\Messenger\Middleware\ValidationMiddleware;
36+
use Symfony\Component\Messenger\ParallelMessageBus;
3637
use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy;
3738
use Symfony\Component\Messenger\RoutableMessageBus;
3839
use Symfony\Component\Messenger\Transport\InMemory\InMemoryTransportFactory;
@@ -54,6 +55,7 @@
5455
abstract_arg('per message senders map'),
5556
abstract_arg('senders service locator'),
5657
])
58+
5759
->set('messenger.middleware.send_message', SendMessageMiddleware::class)
5860
->abstract()
5961
->args([
@@ -134,6 +136,15 @@
134136
])
135137
->tag('messenger.transport_factory')
136138

139+
->set('parallel_bus', ParallelMessageBus::class)
140+
->args([
141+
[],
142+
param('kernel.environment'),
143+
param('kernel.debug'),
144+
param('kernel.project_dir'),
145+
])
146+
->tag('messenger.bus')
147+
137148
->set('messenger.transport.in_memory.factory', InMemoryTransportFactory::class)
138149
->tag('messenger.transport_factory')
139150
->tag('kernel.reset', ['method' => 'reset'])

‎src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
+3Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ protected function configure(): void
8585
new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'),
8686
new InputOption('no-reset', null, InputOption::VALUE_NONE, 'Do not reset container services after each message'),
8787
new InputOption('all', null, InputOption::VALUE_NONE, 'Consume messages from all receivers'),
88+
new InputOption('parallel-limit', 'p', InputOption::VALUE_REQUIRED, 'The number of concurrent processes', 10),
8889
])
8990
->setHelp(<<<'EOF'
9091
The <info>%command.name%</info> command consumes messages and dispatches them to the message bus.
@@ -250,6 +251,8 @@ protected function execute(InputInterface $input, OutputInterface $output): int
250251
$options['queues'] = $queues;
251252
}
252253

254+
$options['parallel-limit'] = $input->getOption('parallel-limit');
255+
253256
try {
254257
$this->worker->run($options);
255258
} finally {
+80Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger;
13+
14+
use Amp\Cache\LocalCache;
15+
use Amp\Cancellation;
16+
use Amp\Parallel\Worker\Task;
17+
use Amp\Sync\Channel;
18+
use App\Kernel;
19+
use Psr\Container\ContainerInterface;
20+
use Symfony\Component\Dotenv\Dotenv;
21+
use Symfony\Component\Messenger\Exception\LogicException;
22+
use Symfony\Component\Messenger\Stamp\AckStamp;
23+
24+
class DispatchTask implements Task
25+
{
26+
private static ?LocalCache $cache = null;
27+
28+
public function __construct(private Envelope $envelope, private array $stamps, private readonly string $env, private readonly bool $isDebug, private readonly string $projectDir)
29+
{
30+
}
31+
32+
public function run(Channel $channel, Cancellation $cancellation): mixed
33+
{
34+
$container = $this->getContainer();
35+
$envelope = $this->dispatch($container, $channel);
36+
37+
return $envelope->withoutStampsOfType(AckStamp::class);
38+
}
39+
40+
private function dispatch(ContainerInterface $container, $channel)
41+
{
42+
if (!$container->has(MessageBusInterface::class)) {
43+
throw new LogicException(sprintf("%s can't be found.", MessageBusInterface::class));
44+
}
45+
46+
$messageBus = $container->get(MessageBusInterface::class);
47+
48+
return $messageBus->dispatch($this->envelope, $this->stamps);
49+
}
50+
51+
private function getContainer()
52+
{
53+
$cache = self::$cache ??= new LocalCache();
54+
$container = $cache->get('cache-container');
55+
56+
// if not in cache, create container
57+
if (!$container) {
58+
if (!method_exists(Dotenv::class, 'bootEnv')) {
59+
throw new \LogicException(sprintf("Method bootEnv de %s doesn't exist", Dotenv::class));
60+
}
61+
62+
(new Dotenv())->bootEnv($this->projectDir.'/.env');
63+
64+
if (!class_exists(Kernel::class) && !isset($_ENV['KERNEL_CLASS'])) {
65+
throw new \LogicException('You must set the KERNEL_CLASS environment variable to the fully-qualified class name of your Kernel in .env or have "%s" class.', Kernel::class);
66+
} elseif (class_exists(Kernel::class)) {
67+
$kernel = new Kernel($this->env, $this->isDebug);
68+
} else {
69+
$kernel = new $_ENV['KERNEL_CLASS']($this->env, $this->isDebug);
70+
}
71+
72+
$kernel->boot();
73+
74+
$container = $kernel->getContainer();
75+
$cache->set('cache-container', $container);
76+
}
77+
78+
return $container;
79+
}
80+
}

‎src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php
+7-1Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
2121
use Symfony\Component\Messenger\Handler\HandlersLocatorInterface;
2222
use Symfony\Component\Messenger\Stamp\AckStamp;
23+
use Symfony\Component\Messenger\Stamp\BusNameStamp;
2324
use Symfony\Component\Messenger\Stamp\FlushBatchHandlersStamp;
2425
use Symfony\Component\Messenger\Stamp\HandledStamp;
2526
use Symfony\Component\Messenger\Stamp\HandlerArgumentsStamp;
@@ -32,6 +33,8 @@ class HandleMessageMiddleware implements MiddlewareInterface
3233
{
3334
use LoggerAwareTrait;
3435

36+
private const PARALLEL_BUS = 'parallel_bus';
37+
3538
public function __construct(
3639
private HandlersLocatorInterface $handlersLocator,
3740
private bool $allowNoHandlers = false,
@@ -64,6 +67,10 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
6467

6568
/** @var AckStamp $ackStamp */
6669
if ($batchHandler && $ackStamp = $envelope->last(AckStamp::class)) {
70+
if ($envelope->last(BusNameStamp::class) && self::PARALLEL_BUS === $envelope->last(BusNameStamp::class)->getBusName()) {
71+
throw new HandlerFailedException($envelope, [new LogicException("Parallel bus can't be used for batch messages")]);
72+
}
73+
6774
$ack = new Acknowledger(get_debug_type($batchHandler), static function (?\Throwable $e = null, $result = null) use ($envelope, $ackStamp, $handlerDescriptor) {
6875
if (null !== $e) {
6976
$e = new HandlerFailedException($envelope, [$handlerDescriptor->getName() => $e]);
@@ -75,7 +82,6 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
7582
});
7683

7784
$result = $this->callHandler($handler, $message, $ack, $envelope->last(HandlerArgumentsStamp::class));
78-
7985
if (!\is_int($result) || 0 > $result) {
8086
throw new LogicException(sprintf('A handler implementing BatchHandlerInterface must return the size of the current batch as a positive integer, "%s" returned from "%s".', \is_int($result) ? $result : get_debug_type($result), get_debug_type($batchHandler)));
8187
}
+41Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger;
13+
14+
use Amp\Parallel\Worker\ContextWorkerPool;
15+
use Symfony\Component\Messenger\Stamp\FutureStamp;
16+
17+
use function Amp\async;
18+
use function Amp\Parallel\Worker\workerPool;
19+
20+
class ParallelMessageBus implements MessageBusInterface
21+
{
22+
public static ?ContextWorkerPool $worker = null;
23+
24+
public function __construct(private array $something, private readonly string $env, private readonly string $debug, private readonly string $projectdir)
25+
{
26+
}
27+
28+
public function dispatch(object $message, array $stamps = []): Envelope
29+
{
30+
$worker = (self::$worker ??= workerPool());
31+
32+
$envelope = Envelope::wrap($message, $stamps);
33+
$task = new DispatchTask($envelope, $stamps, $this->env, $this->debug, $this->projectdir);
34+
35+
$future = async(function () use ($worker, $task) {
36+
return $worker->submit($task);
37+
});
38+
39+
return $envelope->with(new FutureStamp($future));
40+
}
41+
}
+26Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Stamp;
13+
14+
use Amp\Future;
15+
16+
readonly class FutureStamp implements StampInterface
17+
{
18+
public function __construct(private Future $future)
19+
{
20+
}
21+
22+
public function getFuture(): Future
23+
{
24+
return $this->future;
25+
}
26+
}

‎src/Symfony/Component/Messenger/TraceableMessageBus.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/TraceableMessageBus.php
+8-1Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ public function dispatch(object $message, array $stamps = []): Envelope
4141

4242
throw $e;
4343
} finally {
44-
$this->dispatchedMessages[] = $context + ['stamps_after_dispatch' => array_merge([], ...array_values($envelope->all()))];
44+
if ($envelope instanceof Envelope) {
45+
$this->dispatchedMessages[] = $context + ['stamps_after_dispatch' => array_merge([], ...array_values($envelope->all()))];
46+
}
4547
}
4648
}
4749

@@ -102,4 +104,9 @@ private function getCaller(): array
102104
'line' => $line,
103105
];
104106
}
107+
108+
public function getMessageBus(): MessageBusInterface
109+
{
110+
return $this->decoratedBus;
111+
}
105112
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.