Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 89b22b9

Browse filesBrowse files
committed
[Messenger] implementation of messenger:consume, which processes messages concurrently
1 parent b167190 commit 89b22b9
Copy full SHA for 89b22b9

File tree

11 files changed

+237
-13
lines changed
Filter options

11 files changed

+237
-13
lines changed

‎src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2158,7 +2158,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
21582158

21592159
if ($busId === $config['default_bus']) {
21602160
$container->setAlias('messenger.default_bus', $busId)->setPublic(true);
2161-
$container->setAlias(MessageBusInterface::class, $busId);
2161+
$container->setAlias(MessageBusInterface::class, $busId)->setPublic(true);
21622162
} else {
21632163
$container->registerAliasForArgument($busId, MessageBusInterface::class);
21642164
}

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php
+9Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
3434
use Symfony\Component\Messenger\Middleware\TraceableMiddleware;
3535
use Symfony\Component\Messenger\Middleware\ValidationMiddleware;
36+
use Symfony\Component\Messenger\ParallelMessageBus;
3637
use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy;
3738
use Symfony\Component\Messenger\RoutableMessageBus;
3839
use Symfony\Component\Messenger\Transport\InMemory\InMemoryTransportFactory;
@@ -54,6 +55,7 @@
5455
abstract_arg('per message senders map'),
5556
abstract_arg('senders service locator'),
5657
])
58+
5759
->set('messenger.middleware.send_message', SendMessageMiddleware::class)
5860
->abstract()
5961
->args([
@@ -134,6 +136,13 @@
134136
])
135137
->tag('messenger.transport_factory')
136138

139+
->set('parallel_bus', ParallelMessageBus::class)
140+
->args([
141+
[],
142+
service('kernel'),
143+
])
144+
->tag('messenger.bus')
145+
137146
->set('messenger.transport.in_memory.factory', InMemoryTransportFactory::class)
138147
->tag('messenger.transport_factory')
139148
->tag('kernel.reset', ['method' => 'reset'])

‎src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
+3Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ protected function configure(): void
8585
new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'),
8686
new InputOption('no-reset', null, InputOption::VALUE_NONE, 'Do not reset container services after each message'),
8787
new InputOption('all', null, InputOption::VALUE_NONE, 'Consume messages from all receivers'),
88+
new InputOption('parallel-limit', 'p', InputOption::VALUE_REQUIRED, 'The number of concurrent processes', 10),
8889
])
8990
->setHelp(<<<'EOF'
9091
The <info>%command.name%</info> command consumes messages and dispatches them to the message bus.
@@ -250,6 +251,8 @@ protected function execute(InputInterface $input, OutputInterface $output): int
250251
$options['queues'] = $queues;
251252
}
252253

254+
$options['parallel-limit'] = $input->getOption('parallel-limit');
255+
253256
try {
254257
$this->worker->run($options);
255258
} finally {
+85Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
/*
13+
* This file is part of the Symfony package.
14+
*
15+
* (c) Fabien Potencier <fabien@symfony.com>
16+
*
17+
* For the full copyright and license information, please view the LICENSE
18+
* file that was distributed with this source code.
19+
*/
20+
21+
namespace Symfony\Component\Messenger;
22+
23+
use Amp\Cache\LocalCache;
24+
use Amp\Cancellation;
25+
use Amp\Parallel\Worker\Task;
26+
use Amp\Sync\Channel;
27+
use App\Kernel;
28+
use Psr\Container\ContainerInterface;
29+
use Symfony\Component\Dotenv\Dotenv;
30+
use Symfony\Component\Messenger\Exception\LogicException;
31+
use Symfony\Component\Messenger\Stamp\AckStamp;
32+
33+
class DispatchTask implements Task
34+
{
35+
private static ?LocalCache $cache = null;
36+
37+
public function __construct(private Envelope $envelope, private array $stamps, private readonly string $env, private readonly bool $isDebug)
38+
{
39+
}
40+
41+
public function run(Channel $channel, Cancellation $cancellation): mixed
42+
{
43+
$container = $this->getContainer();
44+
$envelope = $this->dispatch($container, $channel);
45+
46+
return $envelope->withoutStampsOfType(AckStamp::class);
47+
}
48+
49+
private function dispatch(ContainerInterface $container, $channel)
50+
{
51+
if (!$container->has(MessageBusInterface::class)) {
52+
throw new LogicException(sprintf("%s can't be found.", MessageBusInterface::class));
53+
}
54+
55+
$messageBus = $container->get(MessageBusInterface::class);
56+
57+
if ($messageBus instanceof TraceableMessageBus) {
58+
$messageBus = $messageBus->getMessageBus();
59+
}
60+
61+
return $messageBus->dispatch($this->envelope, $this->stamps);
62+
}
63+
64+
private function getContainer()
65+
{
66+
$cache = self::$cache ??= new LocalCache();
67+
$container = $cache->get('cache-container');
68+
69+
// if not in cache, create container
70+
if (!$container) {
71+
$kernel = new Kernel($this->env, $this->isDebug);
72+
$kernel->boot();
73+
74+
$container = $kernel->getContainer();
75+
$cache->set('cache-container', $container);
76+
77+
(new Dotenv())
78+
->setProdEnvs(['prod'])
79+
->usePutenv(false)
80+
->bootEnv('.env');
81+
}
82+
83+
return $container;
84+
}
85+
}

‎src/Symfony/Component/Messenger/MessageBusInterface.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/MessageBusInterface.php
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,5 @@ interface MessageBusInterface
2727
*
2828
* @throws ExceptionInterface
2929
*/
30-
public function dispatch(object $message, array $stamps = []): Envelope;
30+
public function dispatch(object $message, array $stamps = []): Envelope|array;
3131
}

‎src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php
+7-1Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
2121
use Symfony\Component\Messenger\Handler\HandlersLocatorInterface;
2222
use Symfony\Component\Messenger\Stamp\AckStamp;
23+
use Symfony\Component\Messenger\Stamp\BusNameStamp;
2324
use Symfony\Component\Messenger\Stamp\FlushBatchHandlersStamp;
2425
use Symfony\Component\Messenger\Stamp\HandledStamp;
2526
use Symfony\Component\Messenger\Stamp\HandlerArgumentsStamp;
@@ -32,6 +33,8 @@ class HandleMessageMiddleware implements MiddlewareInterface
3233
{
3334
use LoggerAwareTrait;
3435

36+
private const PARALLEL_BUS = 'parallel_bus';
37+
3538
public function __construct(
3639
private HandlersLocatorInterface $handlersLocator,
3740
private bool $allowNoHandlers = false,
@@ -64,6 +67,10 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
6467

6568
/** @var AckStamp $ackStamp */
6669
if ($batchHandler && $ackStamp = $envelope->last(AckStamp::class)) {
70+
if ($envelope->last(BusNameStamp::class) && self::PARALLEL_BUS === $envelope->last(BusNameStamp::class)->getBusName()) {
71+
throw new HandlerFailedException($envelope, [new LogicException("Parallel bus can't be used for batch messages")]);
72+
}
73+
6774
$ack = new Acknowledger(get_debug_type($batchHandler), static function (?\Throwable $e = null, $result = null) use ($envelope, $ackStamp, $handlerDescriptor) {
6875
if (null !== $e) {
6976
$e = new HandlerFailedException($envelope, [$handlerDescriptor->getName() => $e]);
@@ -75,7 +82,6 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
7582
});
7683

7784
$result = $this->callHandler($handler, $message, $ack, $envelope->last(HandlerArgumentsStamp::class));
78-
7985
if (!\is_int($result) || 0 > $result) {
8086
throw new LogicException(sprintf('A handler implementing BatchHandlerInterface must return the size of the current batch as a positive integer, "%s" returned from "%s".', \is_int($result) ? $result : get_debug_type($result), get_debug_type($batchHandler)));
8187
}
+41Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger;
13+
14+
use Amp\Parallel\Worker\ContextWorkerPool;
15+
use Symfony\Component\HttpKernel\KernelInterface;
16+
17+
use function Amp\async;
18+
use function Amp\Parallel\Worker\workerPool;
19+
20+
class ParallelMessageBus implements MessageBusInterface
21+
{
22+
public static ?ContextWorkerPool $worker = null;
23+
24+
public function __construct(private array $something, private readonly KernelInterface $kernel)
25+
{
26+
}
27+
28+
public function dispatch(object $message, array $stamps = []): Envelope|array
29+
{
30+
$worker = (self::$worker ??= workerPool());
31+
32+
$envelope = Envelope::wrap($message, $stamps);
33+
$task = new DispatchTask($envelope, $stamps, $this->kernel->getEnvironment(), $this->kernel->isDebug());
34+
35+
$future = async(function () use ($worker, $task) {
36+
return $worker->submit($task);
37+
});
38+
39+
return [$future, $envelope];
40+
}
41+
}

‎src/Symfony/Component/Messenger/RoutableMessageBus.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/RoutableMessageBus.php
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public function __construct(ContainerInterface $busLocator, ?MessageBusInterface
3434
$this->fallbackBus = $fallbackBus;
3535
}
3636

37-
public function dispatch(object $envelope, array $stamps = []): Envelope
37+
public function dispatch(object $envelope, array $stamps = []): Envelope|array
3838
{
3939
if (!$envelope instanceof Envelope) {
4040
throw new InvalidArgumentException('Messages passed to RoutableMessageBus::dispatch() must be inside an Envelope.');

‎src/Symfony/Component/Messenger/TraceableMessageBus.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/TraceableMessageBus.php
+9-2Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public function __construct(MessageBusInterface $decoratedBus)
2424
$this->decoratedBus = $decoratedBus;
2525
}
2626

27-
public function dispatch(object $message, array $stamps = []): Envelope
27+
public function dispatch(object $message, array $stamps = []): Envelope|array
2828
{
2929
$envelope = Envelope::wrap($message, $stamps);
3030
$context = [
@@ -41,7 +41,9 @@ public function dispatch(object $message, array $stamps = []): Envelope
4141

4242
throw $e;
4343
} finally {
44-
$this->dispatchedMessages[] = $context + ['stamps_after_dispatch' => array_merge([], ...array_values($envelope->all()))];
44+
if ($envelope instanceof Envelope) {
45+
$this->dispatchedMessages[] = $context + ['stamps_after_dispatch' => array_merge([], ...array_values($envelope->all()))];
46+
}
4547
}
4648
}
4749

@@ -102,4 +104,9 @@ private function getCaller(): array
102104
'line' => $line,
103105
];
104106
}
107+
108+
public function getMessageBus(): MessageBusInterface
109+
{
110+
return $this->decoratedBus;
111+
}
105112
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.