From d3c7a2e0b3ef917ae38f3fda19667d0680335c08 Mon Sep 17 00:00:00 2001 From: Sergey Rabochiy Date: Tue, 26 Jul 2022 16:03:02 +1000 Subject: [PATCH] Batch handler draft --- .../FrameworkExtension.php | 3 +- .../Messenger/Attribute/AsMessageHandler.php | 1 + .../DependencyInjection/MessengerPass.php | 13 +++- .../Messenger/Handler/BatchHandlerAdapter.php | 78 +++++++++++++++++++ .../Handler/BatchStrategyInterface.php | 21 +++++ .../BatchStrategyProviderInterface.php | 17 ++++ .../Messenger/Handler/CountBatchStrategy.php | 36 +++++++++ .../Messenger/Handler/HandlerDescriptor.php | 18 ++++- .../Component/Messenger/Handler/Result.php | 29 +++++++ .../Handler/ResultWrappedHandler.php | 46 +++++++++++ .../Component/Messenger/Tests/WorkerTest.php | 55 +++++++++++++ 11 files changed, 313 insertions(+), 4 deletions(-) create mode 100644 src/Symfony/Component/Messenger/Handler/BatchHandlerAdapter.php create mode 100644 src/Symfony/Component/Messenger/Handler/BatchStrategyInterface.php create mode 100644 src/Symfony/Component/Messenger/Handler/BatchStrategyProviderInterface.php create mode 100644 src/Symfony/Component/Messenger/Handler/CountBatchStrategy.php create mode 100644 src/Symfony/Component/Messenger/Handler/Result.php create mode 100644 src/Symfony/Component/Messenger/Handler/ResultWrappedHandler.php diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php index 2c2af01cf5b89..aaf2bdde33bb5 100644 --- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php +++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php @@ -661,7 +661,8 @@ public function load(array $configs, ContainerBuilder $container) $container->registerAttributeForAutoconfiguration(AsMessageHandler::class, static function (ChildDefinition $definition, AsMessageHandler $attribute, \ReflectionClass|\ReflectionMethod $reflector): void { $tagAttributes = get_object_vars($attribute); $tagAttributes['from_transport'] = $tagAttributes['fromTransport']; - unset($tagAttributes['fromTransport']); + $tagAttributes['batch_strategy'] = $tagAttributes['batchStrategy']; + unset($tagAttributes['fromTransport'], $tagAttributes['batchStrategy']); if ($reflector instanceof \ReflectionMethod) { if (isset($tagAttributes['method'])) { throw new LogicException(sprintf('AsMessageHandler attribute cannot declare a method on "%s::%s()".', $reflector->class, $reflector->name)); diff --git a/src/Symfony/Component/Messenger/Attribute/AsMessageHandler.php b/src/Symfony/Component/Messenger/Attribute/AsMessageHandler.php index c25c8b2c974dc..9483793bbdb6e 100644 --- a/src/Symfony/Component/Messenger/Attribute/AsMessageHandler.php +++ b/src/Symfony/Component/Messenger/Attribute/AsMessageHandler.php @@ -25,6 +25,7 @@ public function __construct( public ?string $handles = null, public ?string $method = null, public int $priority = 0, + public ?string $batchStrategy = null, ) { } } diff --git a/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php b/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php index d50a93c9568ba..2d17829ba2c98 100644 --- a/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php +++ b/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php @@ -135,10 +135,12 @@ private function registerHandlers(ContainerBuilder $container, array $busIds) $definitionId = $serviceId; } + $batchStrategy = isset($tag['batch_strategy']) ? new Reference($tag['batch_strategy']) : null; + $handlerToOriginalServiceIdMapping[$definitionId] = $serviceId; foreach ($buses as $handlerBus) { - $handlersByBusAndMessage[$handlerBus][$message][$priority][] = [$definitionId, $options]; + $handlersByBusAndMessage[$handlerBus][$message][$priority][] = [$definitionId, $options, $batchStrategy]; } } @@ -160,7 +162,14 @@ private function registerHandlers(ContainerBuilder $container, array $busIds) foreach ($handlersByMessage as $message => $handlers) { $handlerDescriptors = []; foreach ($handlers as $handler) { - $definitions[$definitionId = '.messenger.handler_descriptor.'.ContainerBuilder::hash($bus.':'.$message.':'.$handler[0])] = (new Definition(HandlerDescriptor::class))->setArguments([new Reference($handler[0]), $handler[1]]); + $definitionId = '.messenger.handler_descriptor.'.ContainerBuilder::hash($bus.':'.$message.':'.$handler[0]); + $definition = (new Definition(HandlerDescriptor::class)) + ->addArgument(new Reference($handler[0])) + ->addArgument($handler[1]) + ->addArgument($handler[2]) + ; + + $definitions[$definitionId] = $definition; $handlerDescriptors[] = new Reference($definitionId); } diff --git a/src/Symfony/Component/Messenger/Handler/BatchHandlerAdapter.php b/src/Symfony/Component/Messenger/Handler/BatchHandlerAdapter.php new file mode 100644 index 0000000000000..87a96c22253f3 --- /dev/null +++ b/src/Symfony/Component/Messenger/Handler/BatchHandlerAdapter.php @@ -0,0 +1,78 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Handler; + +/** + * @internal + */ +class BatchHandlerAdapter implements BatchHandlerInterface +{ + private readonly \Closure $handler; + private readonly BatchStrategyInterface $batchStrategy; + private \SplObjectStorage $ackMap; + private ?object $lastMessage; + + public function __construct(callable $handler, BatchStrategyInterface $batchStrategy) + { + $this->batchStrategy = $batchStrategy; + $this->handler = $handler(...); + + $this->ackMap = new \SplObjectStorage(); + $this->lastMessage = null; + } + + public function __invoke(object $message, Acknowledger $ack = null): mixed + { + $this->lastMessage = $message; + + if (null === $ack) { + $ack = new Acknowledger(get_debug_type($this)); + $this->ackMap[$message] = $ack; + + $this->flush(true); + + return $ack->getResult(); + } + + $this->ackMap[$message] = $ack; + if (!$this->shouldFlush()) { + return $this->ackMap->count(); + } + + $this->flush(true); + + return 0; + } + + /** + * {@inheritdoc} + */ + public function flush(bool $force): void + { + if (!$this->lastMessage) { + return; + } + + $ackMap = $this->ackMap; + $this->ackMap = new \SplObjectStorage(); + $this->lastMessage = null; + + $this->batchStrategy->beforeHandle(); + ($this->handler)(new Result($ackMap), ...\iterator_to_array($ackMap)); + $this->batchStrategy->afterHandle(); + } + + private function shouldFlush(): bool + { + return $this->lastMessage && $this->batchStrategy->shouldHandle($this->lastMessage); + } +} diff --git a/src/Symfony/Component/Messenger/Handler/BatchStrategyInterface.php b/src/Symfony/Component/Messenger/Handler/BatchStrategyInterface.php new file mode 100644 index 0000000000000..435e4cef56219 --- /dev/null +++ b/src/Symfony/Component/Messenger/Handler/BatchStrategyInterface.php @@ -0,0 +1,21 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Handler; + +interface BatchStrategyInterface +{ + public function shouldHandle(object $lastMessage): bool; + + public function beforeHandle(): void; + + public function afterHandle(): void; +} diff --git a/src/Symfony/Component/Messenger/Handler/BatchStrategyProviderInterface.php b/src/Symfony/Component/Messenger/Handler/BatchStrategyProviderInterface.php new file mode 100644 index 0000000000000..c9398ba81c566 --- /dev/null +++ b/src/Symfony/Component/Messenger/Handler/BatchStrategyProviderInterface.php @@ -0,0 +1,17 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Handler; + +interface BatchStrategyProviderInterface +{ + public function getBatchStrategy(): BatchStrategyInterface; +} diff --git a/src/Symfony/Component/Messenger/Handler/CountBatchStrategy.php b/src/Symfony/Component/Messenger/Handler/CountBatchStrategy.php new file mode 100644 index 0000000000000..0567e1785741a --- /dev/null +++ b/src/Symfony/Component/Messenger/Handler/CountBatchStrategy.php @@ -0,0 +1,36 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Handler; + +class CountBatchStrategy implements BatchStrategyInterface +{ + private int $bufferSize = 0; + + public function __construct(private readonly int $flushSize) + { + } + + public function shouldHandle(object $lastMessage): bool + { + return ++$this->bufferSize >= $this->flushSize; + } + + public function beforeHandle(): void + { + $this->bufferSize = 0; + } + + public function afterHandle(): void + { + // no operation + } +} diff --git a/src/Symfony/Component/Messenger/Handler/HandlerDescriptor.php b/src/Symfony/Component/Messenger/Handler/HandlerDescriptor.php index 8b872bd94b9cb..8dad0dfb5ef44 100644 --- a/src/Symfony/Component/Messenger/Handler/HandlerDescriptor.php +++ b/src/Symfony/Component/Messenger/Handler/HandlerDescriptor.php @@ -23,7 +23,7 @@ final class HandlerDescriptor private ?BatchHandlerInterface $batchHandler = null; private array $options; - public function __construct(callable $handler, array $options = []) + public function __construct(callable $handler, array $options = [], BatchStrategyInterface $batchStrategy = null) { $handler = $handler(...); @@ -45,6 +45,22 @@ public function __construct(callable $handler, array $options = []) $this->name = \get_class($handler).'::'.$r->name; } + + if (!$this->batchHandler && $r->isVariadic()) { + $h = $this->handler; + if (Result::class !== ($r->getParameters()[0] ?? null)?->getType()?->getName()) { + $h = new ResultWrappedHandler($h); + } + + if ($handler instanceof BatchStrategyProviderInterface) { + $batchStrategy = $handler->getBatchStrategy(); + } + + $h = new BatchHandlerAdapter($h, $batchStrategy ?: new CountBatchStrategy(1)); + $this->batchHandler = $h; + $this->handler = $h(...); + unset($h); + } } public function getHandler(): callable diff --git a/src/Symfony/Component/Messenger/Handler/Result.php b/src/Symfony/Component/Messenger/Handler/Result.php new file mode 100644 index 0000000000000..49ffe43dd1cdd --- /dev/null +++ b/src/Symfony/Component/Messenger/Handler/Result.php @@ -0,0 +1,29 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Handler; + +final class Result +{ + public function __construct(private \SplObjectStorage $ackMap) + { + } + + public function ok(object $message, mixed $result = null): void + { + $this->ackMap[$message]->ack($result); + } + + public function error(object $message, \Throwable $e): void + { + $this->ackMap[$message]->nack($e); + } +} diff --git a/src/Symfony/Component/Messenger/Handler/ResultWrappedHandler.php b/src/Symfony/Component/Messenger/Handler/ResultWrappedHandler.php new file mode 100644 index 0000000000000..c4d271b33ea8d --- /dev/null +++ b/src/Symfony/Component/Messenger/Handler/ResultWrappedHandler.php @@ -0,0 +1,46 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Handler; + +/** + * @internal + */ +class ResultWrappedHandler +{ + public function __construct(private readonly \Closure $handler) + { + } + + public function __invoke(Result $r, object ...$messages): int + { + try { + $lastResult = ($this->handler)(...$messages); + } catch (\Throwable $e) { + foreach ($messages as $message) { + $r->error($message, $e); + } + + return \count($messages); + } + + $length = \count($messages); + $lastMessage = \array_pop($messages); + + foreach ($messages as $message) { + $r->ok($message); + } + + $r->ok($lastMessage, $lastResult); + + return $length; + } +} diff --git a/src/Symfony/Component/Messenger/Tests/WorkerTest.php b/src/Symfony/Component/Messenger/Tests/WorkerTest.php index 747c6ed855d79..7809a87d895a3 100644 --- a/src/Symfony/Component/Messenger/Tests/WorkerTest.php +++ b/src/Symfony/Component/Messenger/Tests/WorkerTest.php @@ -29,6 +29,9 @@ use Symfony\Component\Messenger\Handler\Acknowledger; use Symfony\Component\Messenger\Handler\BatchHandlerInterface; use Symfony\Component\Messenger\Handler\BatchHandlerTrait; +use Symfony\Component\Messenger\Handler\BatchStrategyInterface; +use Symfony\Component\Messenger\Handler\BatchStrategyProviderInterface; +use Symfony\Component\Messenger\Handler\CountBatchStrategy; use Symfony\Component\Messenger\Handler\HandlerDescriptor; use Symfony\Component\Messenger\Handler\HandlersLocator; use Symfony\Component\Messenger\MessageBus; @@ -537,6 +540,43 @@ public function testFlushBatchOnStop() $this->assertSame($expectedMessages, $handler->processedMessages); } + + public function testVariadicBatchHandler() + { + $expectedMessages = [ + new DummyMessage('Hey'), + new DummyMessage('Bob'), + ]; + + $receiver = new DummyReceiver([ + [new Envelope($expectedMessages[0])], + [new Envelope($expectedMessages[1])], + ]); + + $handler = new VariadicBatchHandler(); + + $middleware = new HandleMessageMiddleware(new HandlersLocator([ + DummyMessage::class => [new HandlerDescriptor($handler)], + ])); + + $bus = new MessageBus([$middleware]); + + $dispatcher = new EventDispatcher(); + $dispatcher->addListener(WorkerRunningEvent::class, function (WorkerRunningEvent $event) use ($receiver) { + static $i = 0; + if (1 < ++$i) { + $event->getWorker()->stop(); + $this->assertSame(2, $receiver->getAcknowledgeCount()); + } else { + $this->assertSame(0, $receiver->getAcknowledgeCount()); + } + }); + + $worker = new Worker([$receiver], $bus, $dispatcher); + $worker->run(); + + $this->assertSame($expectedMessages, $handler->processedMessages); + } } class DummyReceiver implements ReceiverInterface @@ -624,6 +664,21 @@ private function process(array $jobs): void } } +class VariadicBatchHandler implements BatchStrategyProviderInterface +{ + public $processedMessages; + + public function __invoke(DummyMessage ...$messages): void + { + $this->processedMessages = $messages; + } + + public function getBatchStrategy(): BatchStrategyInterface + { + return new CountBatchStrategy(2); + } +} + class ResettableDummyReceiver extends DummyReceiver implements ResetInterface { private $hasBeenReset = false;