Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 522a316

Browse filesBrowse files
committed
test: non blocking
1 parent 5f63be6 commit 522a316
Copy full SHA for 522a316

File tree

Expand file treeCollapse file tree

14 files changed

+524
-9
lines changed
Filter options
Expand file treeCollapse file tree

14 files changed

+524
-9
lines changed

‎src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
+12-1Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
namespace Symfony\Bundle\FrameworkBundle\DependencyInjection;
1313

14+
use Amp\Parallel\Worker\Task;
1415
use Composer\InstalledVersions;
1516
use Http\Client\HttpAsyncClient;
1617
use Http\Client\HttpClient;
@@ -113,6 +114,7 @@
113114
use Symfony\Component\Messenger\MessageBus;
114115
use Symfony\Component\Messenger\MessageBusInterface;
115116
use Symfony\Component\Messenger\Middleware\RouterContextMiddleware;
117+
use Symfony\Component\Messenger\ParallelMessageBus;
116118
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
117119
use Symfony\Component\Messenger\Transport\TransportFactoryInterface as MessengerTransportFactoryInterface;
118120
use Symfony\Component\Messenger\Transport\TransportInterface;
@@ -2090,6 +2092,10 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
20902092

20912093
$loader->load('messenger.php');
20922094

2095+
if (!class_exists(ParallelMessageBus::class)) {
2096+
$container->removeDefinition('parallel_bus');
2097+
}
2098+
20932099
if (!interface_exists(DenormalizerInterface::class)) {
20942100
$container->removeDefinition('serializer.normalizer.flatten_exception');
20952101
}
@@ -2161,7 +2167,12 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
21612167

21622168
if ($busId === $config['default_bus']) {
21632169
$container->setAlias('messenger.default_bus', $busId)->setPublic(true);
2164-
$container->setAlias(MessageBusInterface::class, $busId);
2170+
2171+
$messageBusAlias = $container->setAlias(MessageBusInterface::class, $busId);
2172+
2173+
if (class_exists(Task::class)) {
2174+
$messageBusAlias->setPublic(true);
2175+
}
21652176
} else {
21662177
$container->registerAliasForArgument($busId, MessageBusInterface::class);
21672178
}

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php
+11Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
3434
use Symfony\Component\Messenger\Middleware\TraceableMiddleware;
3535
use Symfony\Component\Messenger\Middleware\ValidationMiddleware;
36+
use Symfony\Component\Messenger\ParallelMessageBus;
3637
use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy;
3738
use Symfony\Component\Messenger\RoutableMessageBus;
3839
use Symfony\Component\Messenger\Transport\InMemory\InMemoryTransportFactory;
@@ -54,6 +55,7 @@
5455
abstract_arg('per message senders map'),
5556
abstract_arg('senders service locator'),
5657
])
58+
5759
->set('messenger.middleware.send_message', SendMessageMiddleware::class)
5860
->abstract()
5961
->args([
@@ -134,6 +136,15 @@
134136
])
135137
->tag('messenger.transport_factory')
136138

139+
->set('parallel_bus', ParallelMessageBus::class)
140+
->args([
141+
[],
142+
param('kernel.environment'),
143+
param('kernel.debug'),
144+
param('kernel.project_dir'),
145+
])
146+
->tag('messenger.bus')
147+
137148
->set('messenger.transport.in_memory.factory', InMemoryTransportFactory::class)
138149
->args([
139150
service('clock')->nullOnInvalid(),

‎src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
+3Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ protected function configure(): void
7575
new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'),
7676
new InputOption('no-reset', null, InputOption::VALUE_NONE, 'Do not reset container services after each message'),
7777
new InputOption('all', null, InputOption::VALUE_NONE, 'Consume messages from all receivers'),
78+
new InputOption('parallel-limit', 'p', InputOption::VALUE_REQUIRED, 'The number of concurrent processes', 10),
7879
])
7980
->setHelp(<<<'EOF'
8081
The <info>%command.name%</info> command consumes messages and dispatches them to the message bus.
@@ -240,6 +241,8 @@ protected function execute(InputInterface $input, OutputInterface $output): int
240241
$options['queues'] = $queues;
241242
}
242243

244+
$options['parallel-limit'] = $input->getOption('parallel-limit');
245+
243246
try {
244247
$this->worker->run($options);
245248
} finally {
+88Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger;
13+
14+
use Amp\Cache\LocalCache;
15+
use Amp\Cancellation;
16+
use Amp\Parallel\Worker\Task;
17+
use Amp\Sync\Channel;
18+
use App\Kernel;
19+
use Psr\Container\ContainerInterface;
20+
use Symfony\Component\Dotenv\Dotenv;
21+
use Symfony\Component\Messenger\Stamp\AckStamp;
22+
23+
class DispatchTask implements Task
24+
{
25+
private static ?LocalCache $cache = null;
26+
27+
public function __construct(
28+
private Envelope $envelope,
29+
private array $stamps,
30+
private readonly string $env,
31+
private readonly bool $isDebug,
32+
private readonly string $projectDir,
33+
) {
34+
if (!class_exists(LocalCache::class)) {
35+
throw new \LogicException(\sprintf('Package "amp/cache" is required to use the "%s". Try running "composer require amphp/cache".', LocalCache::class));
36+
}
37+
}
38+
39+
public function run(Channel $channel, Cancellation $cancellation): mixed
40+
{
41+
$container = $this->getContainer();
42+
$envelope = $this->dispatch($container, $channel);
43+
44+
return $envelope->withoutStampsOfType(AckStamp::class);
45+
}
46+
47+
private function dispatch(ContainerInterface $container, $channel)
48+
{
49+
$messageBus = $container->get(MessageBusInterface::class);
50+
51+
return $messageBus->dispatch($this->envelope, $this->stamps);
52+
}
53+
54+
private function getContainer()
55+
{
56+
$cache = self::$cache ??= new LocalCache();
57+
$container = $cache->get('cache-container');
58+
59+
// if not in cache, create container
60+
if (!$container) {
61+
if (!method_exists(Dotenv::class, 'bootEnv')) {
62+
throw new \LogicException(\sprintf("Method bootEnv de \"%s\" doesn't exist.", Dotenv::class));
63+
}
64+
65+
(new Dotenv())->bootEnv($this->projectDir.'/.env');
66+
67+
if (!class_exists(Kernel::class) && !isset($_ENV['KERNEL_CLASS'])) {
68+
throw new \LogicException('You must set the KERNEL_CLASS environment variable to the fully-qualified class name of your Kernel in .env or have "%s" class.', Kernel::class);
69+
} elseif (isset($_ENV['KERNEL_CLASS'])) {
70+
$kernel = new $_ENV['KERNEL_CLASS']($this->env, $this->isDebug);
71+
} else {
72+
$kernel = new Kernel($this->env, $this->isDebug);
73+
}
74+
75+
$kernel->boot();
76+
77+
$container = $kernel->getContainer();
78+
$cache->set('cache-container', $container);
79+
}
80+
81+
return $container;
82+
}
83+
84+
public function getEnvelope(): Envelope
85+
{
86+
return $this->envelope;
87+
}
88+
}

‎src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php
+5-1Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
2121
use Symfony\Component\Messenger\Handler\HandlersLocatorInterface;
2222
use Symfony\Component\Messenger\Stamp\AckStamp;
23+
use Symfony\Component\Messenger\Stamp\BusNameStamp;
2324
use Symfony\Component\Messenger\Stamp\FlushBatchHandlersStamp;
2425
use Symfony\Component\Messenger\Stamp\HandledStamp;
2526
use Symfony\Component\Messenger\Stamp\HandlerArgumentsStamp;
@@ -64,6 +65,10 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
6465

6566
/** @var AckStamp $ackStamp */
6667
if ($batchHandler && $ackStamp = $envelope->last(AckStamp::class)) {
68+
if ($envelope->last(BusNameStamp::class) && 'parallel_bus' === $envelope->last(BusNameStamp::class)->getBusName()) {
69+
throw new HandlerFailedException($envelope, [new LogicException("Parallel bus can't be used for batch messages")]);
70+
}
71+
6772
$ack = new Acknowledger(get_debug_type($batchHandler), static function (?\Throwable $e = null, $result = null) use ($envelope, $ackStamp, $handlerDescriptor) {
6873
if (null !== $e) {
6974
$e = new HandlerFailedException($envelope, [$handlerDescriptor->getName() => $e]);
@@ -75,7 +80,6 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope
7580
});
7681

7782
$result = $this->callHandler($handler, $message, $ack, $envelope->last(HandlerArgumentsStamp::class));
78-
7983
if (!\is_int($result) || 0 > $result) {
8084
throw new LogicException(\sprintf('A handler implementing BatchHandlerInterface must return the size of the current batch as a positive integer, "%s" returned from "%s".', \is_int($result) ? $result : get_debug_type($result), get_debug_type($batchHandler)));
8185
}
+53Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger;
13+
14+
use Amp\Parallel\Worker\ContextWorkerPool;
15+
use Symfony\Component\Messenger\Stamp\FutureStamp;
16+
17+
use function Amp\async;
18+
use function Amp\Parallel\Worker\workerPool;
19+
20+
/**
21+
* Using this bus will enable concurrent message processing without the need for multiple workers
22+
* using multiple processes or threads.
23+
* It requires a ZTS build of PHP 8.2+ and ext-parallel to create threads; otherwise, it will use processes.
24+
*/
25+
final class ParallelMessageBus implements MessageBusInterface
26+
{
27+
public static ?ContextWorkerPool $worker = null;
28+
29+
public function __construct(
30+
private array $something,
31+
private readonly string $env,
32+
private readonly string $debug,
33+
private readonly string $projectdir,
34+
) {
35+
if (!class_exists(ContextWorkerPool::class)) {
36+
throw new \LogicException(\sprintf('Package "amp/parallel" is required to use the "%s". Try running "composer require amphp/parallel".', self::class));
37+
}
38+
}
39+
40+
public function dispatch(object $message, array $stamps = []): Envelope
41+
{
42+
$worker = (self::$worker ??= workerPool());
43+
44+
$envelope = Envelope::wrap($message, $stamps);
45+
$task = new DispatchTask($envelope, $stamps, $this->env, $this->debug, $this->projectdir);
46+
47+
$future = async(function () use ($worker, $task) {
48+
return $worker->submit($task);
49+
});
50+
51+
return $envelope->with(new FutureStamp($future));
52+
}
53+
}
+31Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Stamp;
13+
14+
use Amp\Future;
15+
16+
/**
17+
* This stamp allows passing the future representing the potential result of the handler,
18+
* which is treated as an asynchronous operation,
19+
* and will be retrieved later by the worker to ack or nack based on the obtained result.
20+
*/
21+
final readonly class FutureStamp implements StampInterface
22+
{
23+
public function __construct(private Future $future)
24+
{
25+
}
26+
27+
public function getFuture(): Future
28+
{
29+
return $this->future;
30+
}
31+
}
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
KERNEL_CLASS=Symfony\Component\Messenger\Tests\Fixtures\App\Kernel
+46Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
<?php
2+
3+
namespace Symfony\Component\Messenger\Tests\Fixtures\App;
4+
5+
use Symfony\Bundle\FrameworkBundle\FrameworkBundle;
6+
use Symfony\Bundle\FrameworkBundle\Kernel\MicroKernelTrait;
7+
use Symfony\Component\Config\Loader\LoaderInterface;
8+
use Symfony\Component\DependencyInjection\ContainerBuilder;
9+
use Symfony\Component\HttpKernel\Kernel as SymfonyKernel;
10+
use Symfony\Component\Messenger\MessageBus;
11+
use Symfony\Component\Messenger\MessageBusInterface;
12+
13+
class Kernel extends SymfonyKernel
14+
{
15+
use MicroKernelTrait;
16+
17+
public function registerBundles(): iterable
18+
{
19+
yield new FrameworkBundle();
20+
}
21+
22+
public function registerContainerConfiguration(LoaderInterface $loader): void
23+
{
24+
$loader->load(function (ContainerBuilder $container) use ($loader) {
25+
$container->loadFromExtension('framework', [
26+
'router' => [
27+
'resource' => 'kernel::loadRoutes',
28+
'type' => 'service',
29+
],
30+
]);
31+
$container
32+
->register('message.bus', MessageBus::class);
33+
$container->setAlias(MessageBusInterface::class, 'message.bus')->setPublic(true);
34+
});
35+
}
36+
37+
public function getCacheDir(): string
38+
{
39+
return sys_get_temp_dir().'/'. \Symfony\Component\HttpKernel\Kernel::VERSION.'/EmptyAppKernel/cache/'.$this->environment;
40+
}
41+
42+
public function getLogDir(): string
43+
{
44+
return sys_get_temp_dir().'/'.Kernel::VERSION.'/EmptyAppKernel/logs';
45+
}
46+
}
+53Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests;
13+
14+
use Amp\Parallel\Worker\Worker;
15+
use PHPUnit\Framework\TestCase;
16+
use Symfony\Component\Messenger\Envelope;
17+
use Symfony\Component\Messenger\MessageBusInterface;
18+
use Symfony\Component\Messenger\ParallelMessageBus;
19+
use Symfony\Component\Messenger\Stamp\FutureStamp;
20+
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
21+
22+
class ParallelMessageBusTest extends TestCase
23+
{
24+
private string $env = 'dev';
25+
private bool $debug = false;
26+
private string $projectDir = 'path/to/project';
27+
28+
public function testItHasTheRightInterface()
29+
{
30+
if (!class_exists(Worker::class)) {
31+
$this->markTestSkipped(\sprintf('%s not available.', Worker::class));
32+
}
33+
34+
$bus = new ParallelMessageBus([], $this->env, $this->debug, $this->projectDir);
35+
36+
$this->assertInstanceOf(MessageBusInterface::class, $bus);
37+
}
38+
39+
public function testItReturnsWithFutureStamp()
40+
{
41+
if (!class_exists(Worker::class)) {
42+
$this->markTestSkipped(\sprintf('%s not available.', Worker::class));
43+
}
44+
45+
$message = new DummyMessage('Hello');
46+
47+
$bus = new ParallelMessageBus([], $this->env, $this->debug, $this->projectDir);
48+
49+
$envelope = $bus->dispatch(new Envelope($message));
50+
51+
$this->assertInstanceOf(FutureStamp::class, $envelope->last(FutureStamp::class));
52+
}
53+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.