Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 5d6bae6

Browse filesBrowse files
committed
test: non blocking
1 parent 49c8190 commit 5d6bae6
Copy full SHA for 5d6bae6

File tree

Expand file treeCollapse file tree

6 files changed

+60
-35
lines changed
Filter options
Expand file treeCollapse file tree

6 files changed

+60
-35
lines changed

‎composer.json

Copy file name to clipboardExpand all lines: composer.json
-1Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,6 @@
125125
"amphp/amp": "^2.5",
126126
"amphp/http-client": "^4.2.1",
127127
"amphp/http-tunnel": "^1.0",
128-
"amphp/parallel": "^1.x-dev",
129128
"async-aws/ses": "^1.0",
130129
"async-aws/sqs": "^1.0|^2.0",
131130
"async-aws/sns": "^1.0",

‎src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
+7-1Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
namespace Symfony\Bundle\FrameworkBundle\DependencyInjection;
1313

14+
use Amp\Parallel\Worker\Task;
1415
use Composer\InstalledVersions;
1516
use Http\Client\HttpAsyncClient;
1617
use Http\Client\HttpClient;
@@ -2163,7 +2164,12 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
21632164

21642165
if ($busId === $config['default_bus']) {
21652166
$container->setAlias('messenger.default_bus', $busId)->setPublic(true);
2166-
$container->setAlias(MessageBusInterface::class, $busId)->setPublic(true);
2167+
2168+
$messageBusAlias = $container->setAlias(MessageBusInterface::class, $busId);
2169+
2170+
if (class_exists(Task::class)) {
2171+
$messageBusAlias->setPublic(true);
2172+
}
21672173
} else {
21682174
$container->registerAliasForArgument($busId, MessageBusInterface::class);
21692175
}

‎src/Symfony/Component/Messenger/DispatchTask.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/DispatchTask.php
+11-8Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
use App\Kernel;
1919
use Psr\Container\ContainerInterface;
2020
use Symfony\Component\Dotenv\Dotenv;
21-
use Symfony\Component\Messenger\Exception\LogicException;
2221
use Symfony\Component\Messenger\Stamp\AckStamp;
2322

2423
class DispatchTask implements Task
@@ -27,6 +26,9 @@ class DispatchTask implements Task
2726

2827
public function __construct(private Envelope $envelope, private array $stamps, private readonly string $env, private readonly bool $isDebug, private readonly string $projectDir)
2928
{
29+
if (!class_exists(LocalCache::class)) {
30+
throw new \LogicException(sprintf('Package "amp/cache" is required to use the "%s". Try running "composer require amphp/cache".', LocalCache::class));
31+
}
3032
}
3133

3234
public function run(Channel $channel, Cancellation $cancellation): mixed
@@ -39,10 +41,6 @@ public function run(Channel $channel, Cancellation $cancellation): mixed
3941

4042
private function dispatch(ContainerInterface $container, $channel)
4143
{
42-
if (!$container->has(MessageBusInterface::class)) {
43-
throw new LogicException(sprintf("%s can't be found.", MessageBusInterface::class));
44-
}
45-
4644
$messageBus = $container->get(MessageBusInterface::class);
4745

4846
return $messageBus->dispatch($this->envelope, $this->stamps);
@@ -63,10 +61,10 @@ private function getContainer()
6361

6462
if (!class_exists(Kernel::class) && !isset($_ENV['KERNEL_CLASS'])) {
6563
throw new \LogicException('You must set the KERNEL_CLASS environment variable to the fully-qualified class name of your Kernel in .env or have "%s" class.', Kernel::class);
66-
} elseif (class_exists(Kernel::class)) {
67-
$kernel = new Kernel($this->env, $this->isDebug);
68-
} else {
64+
} elseif (isset($_ENV['KERNEL_CLASS'])) {
6965
$kernel = new $_ENV['KERNEL_CLASS']($this->env, $this->isDebug);
66+
} else {
67+
$kernel = new Kernel($this->env, $this->isDebug);
7068
}
7169

7270
$kernel->boot();
@@ -77,4 +75,9 @@ private function getContainer()
7775

7876
return $container;
7977
}
78+
79+
public function getEnvelope(): Envelope
80+
{
81+
return $this->envelope;
82+
}
8083
}

‎src/Symfony/Component/Messenger/ParallelMessageBus.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/ParallelMessageBus.php
+9-1Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,20 @@
1717
use function Amp\async;
1818
use function Amp\Parallel\Worker\workerPool;
1919

20-
class ParallelMessageBus implements MessageBusInterface
20+
/**
21+
* Using this bus will enable concurrent message processing without the need for multiple workers
22+
* using multiple processes or threads
23+
* It requires a ZTS build of PHP 8.2+ and ext-parallel to create threads; otherwise, it will use processes.
24+
*/
25+
final class ParallelMessageBus implements MessageBusInterface
2126
{
2227
public static ?ContextWorkerPool $worker = null;
2328

2429
public function __construct(private array $something, private readonly string $env, private readonly string $debug, private readonly string $projectdir)
2530
{
31+
if (!class_exists(ContextWorkerPool::class)) {
32+
throw new \LogicException(sprintf('Package "amp/parallel" is required to use the "%s". Try running "composer require amphp/parallel".', self::class));
33+
}
2634
}
2735

2836
public function dispatch(object $message, array $stamps = []): Envelope

‎src/Symfony/Component/Messenger/Stamp/FutureStamp.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Stamp/FutureStamp.php
+6-1Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,12 @@
1313

1414
use Amp\Future;
1515

16-
readonly class FutureStamp implements StampInterface
16+
/**
17+
* This stamps allows passing the future representing the potential result of the handler,
18+
* which is treated as an asynchronous operation,
19+
* and will be retrieved later by the worker to ack or nack based on the obtained result.
20+
*/
21+
final readonly class FutureStamp implements StampInterface
1722
{
1823
public function __construct(private Future $future)
1924
{

‎src/Symfony/Component/Messenger/Worker.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Worker.php
+27-23Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111

1212
namespace Symfony\Component\Messenger;
1313

14-
use Amp\Parallel\Worker\ContextWorkerPool;
14+
use Amp\Future;
15+
use Amp\Parallel\Worker\Execution;
1516
use Psr\EventDispatcher\EventDispatcherInterface;
1617
use Psr\Log\LoggerInterface;
1718
use Symfony\Component\Clock\Clock;
@@ -39,6 +40,10 @@
3940
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
4041
use Symfony\Component\RateLimiter\LimiterInterface;
4142

43+
use function Amp\Future\awaitAll;
44+
use function Amp\Future\awaitAny;
45+
use function Amp\Future\awaitAnyN;
46+
4247
/**
4348
* @author Samuel Roze <samuel.roze@gmail.com>
4449
* @author Tobias Schultze <http://tobion.de>
@@ -110,7 +115,7 @@ public function run(array $options = []): void
110115

111116
if (!$envelopes) {
112117
// flush
113-
$this->handleFutures($transportName);
118+
$this->handleFutures($transportName, 0);
114119
}
115120

116121
foreach ($envelopes as $envelope) {
@@ -180,10 +185,6 @@ private function handleMessage(Envelope $envelope, string $transportName, int $p
180185
$envelope = $envelope->with(new AckStamp($ack));
181186
}
182187

183-
if ($busNameStamp && 'parallel_bus' === $busNameStamp->getBusName() && !class_exists(ContextWorkerPool::class)) {
184-
throw new \LogicException(sprintf('Package "amp/parallel" is required to use the "%s". Try running "composer require amphp/parallel".', ParallelMessageBus::class));
185-
}
186-
187188
$envelope = $this->bus->dispatch($envelope);
188189

189190
// "non concurrent" behaviour
@@ -193,6 +194,7 @@ private function handleMessage(Envelope $envelope, string $transportName, int $p
193194
return;
194195
}
195196

197+
$envelope = $envelope->withoutStampsOfType(FutureStamp::class);
196198
self::$futures[] = [$futureStamp->getFuture(), $envelope];
197199
} catch (\Throwable $e) {
198200
$this->preAck($envelope, $transportName, $acked, $e);
@@ -204,7 +206,7 @@ private function handleMessage(Envelope $envelope, string $transportName, int $p
204206
return;
205207
}
206208

207-
$this->handleFutures($transportName);
209+
$this->handleFutures($transportName, $parallelProcessesLimit);
208210
}
209211

210212
private function preAck(Envelope $envelope, string $transportName, bool $acked, $e): void
@@ -328,34 +330,36 @@ public function getMetadata(): WorkerMetadata
328330
return $this->metadata;
329331
}
330332

331-
public function handleFutures(string $transportName): void
333+
public function handleFutures(string $transportName, $parallelProcessLimit): void
332334
{
333335
$toHandle = self::$futures;
334336
self::$futures = [];
335-
$errors = [];
336337

337-
foreach ($toHandle as $future) {
338-
$e = null;
338+
if (!$toHandle) {
339+
return;
340+
}
341+
342+
$futuresReceived = [];
343+
$envelopesAssociated = [];
344+
345+
foreach ($toHandle as $combo) {
346+
$futuresReceived[] = $combo[0];
347+
$envelopesAssociated[] = $combo[1];
348+
}
349+
350+
[$errorsFromAwait, $executions] = awaitAll($futuresReceived);
351+
352+
foreach ($errorsFromAwait as $index => $error) {
339353
try {
340-
$execution = $future[0]->await();
354+
$execution = $futuresReceived[$index]->await();
341355
$envelope = $execution->await();
342356
} catch (\Throwable $e) {
343-
$errors[] = [$future[1]->withoutStampsOfType(FutureStamp::class), $e];
357+
$this->preAck($envelopesAssociated[$index]->withoutStampsOfType(FutureStamp::class), $transportName, false, $e);
344358

345359
continue;
346360
}
347361

348362
$this->preAck($envelope, $transportName, false, null);
349363
}
350-
351-
if (!$errors) {
352-
return;
353-
}
354-
355-
foreach ($errors as $combo) {
356-
[$envelope, $e] = $combo;
357-
358-
$this->preAck($envelope, $transportName, false, $e);
359-
}
360364
}
361365
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.