From aa7000a3ddfcd5c613a81a180ba5a1a6436e5ae2 Mon Sep 17 00:00:00 2001 From: Vincent Langlet Date: Mon, 3 Feb 2025 11:05:40 +0100 Subject: [PATCH] Deduplicate Middleware --- .../FrameworkExtension.php | 8 +++ .../Resources/config/messenger.php | 6 ++ .../FrameworkExtensionTestCase.php | 64 ++++++++++++----- src/Symfony/Component/Messenger/CHANGELOG.md | 1 + .../Middleware/DeduplicateMiddleware.php | 51 ++++++++++++++ .../Messenger/Stamp/DeduplicateStamp.php | 42 ++++++++++++ .../Middleware/DeduplicateMiddlewareTest.php | 68 +++++++++++++++++++ src/Symfony/Component/Messenger/composer.json | 2 + 8 files changed, 225 insertions(+), 17 deletions(-) create mode 100644 src/Symfony/Component/Messenger/Middleware/DeduplicateMiddleware.php create mode 100644 src/Symfony/Component/Messenger/Stamp/DeduplicateStamp.php create mode 100644 src/Symfony/Component/Messenger/Tests/Middleware/DeduplicateMiddlewareTest.php diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php index f6160d49315b..83f9011ace03 100644 --- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php +++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php @@ -120,6 +120,7 @@ use Symfony\Component\Messenger\Handler\BatchHandlerInterface; use Symfony\Component\Messenger\MessageBus; use Symfony\Component\Messenger\MessageBusInterface; +use Symfony\Component\Messenger\Middleware\DeduplicateMiddleware; use Symfony\Component\Messenger\Middleware\RouterContextMiddleware; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; use Symfony\Component\Messenger\Transport\TransportFactoryInterface as MessengerTransportFactoryInterface; @@ -2266,6 +2267,13 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder ['id' => 'handle_message'], ], ]; + + if (class_exists(DeduplicateMiddleware::class) && class_exists(LockFactory::class)) { + $defaultMiddleware['before'][] = ['id' => 'deduplicate_middleware']; + } else { + $container->removeDefinition('messenger.middleware.deduplicate_middleware'); + } + foreach ($config['buses'] as $busId => $bus) { $middleware = $bus['middleware']; diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php index 40f5b84caa2e..8798d5f2e5e3 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php @@ -25,6 +25,7 @@ use Symfony\Component\Messenger\EventListener\StopWorkerOnRestartSignalListener; use Symfony\Component\Messenger\Handler\RedispatchMessageHandler; use Symfony\Component\Messenger\Middleware\AddBusNameStampMiddleware; +use Symfony\Component\Messenger\Middleware\DeduplicateMiddleware; use Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware; use Symfony\Component\Messenger\Middleware\FailedMessageProcessingMiddleware; use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware; @@ -86,6 +87,11 @@ ->tag('monolog.logger', ['channel' => 'messenger']) ->call('setLogger', [service('logger')->ignoreOnInvalid()]) + ->set('messenger.middleware.deduplicate_middleware', DeduplicateMiddleware::class) + ->args([ + service('lock.factory'), + ]) + ->set('messenger.middleware.add_bus_name_stamp_middleware', AddBusNameStampMiddleware::class) ->abstract() diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTestCase.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTestCase.php index 1f4daac5d453..2535707f5ef5 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTestCase.php +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTestCase.php @@ -62,6 +62,7 @@ use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpTransportFactory; use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdTransportFactory; use Symfony\Component\Messenger\Bridge\Redis\Transport\RedisTransportFactory; +use Symfony\Component\Messenger\Middleware\DeduplicateMiddleware; use Symfony\Component\Messenger\Transport\TransportFactory; use Symfony\Component\Notifier\ChatterInterface; use Symfony\Component\Notifier\TexterInterface; @@ -1061,25 +1062,54 @@ public function testMessengerWithMultipleBuses() $this->assertTrue($container->has('messenger.bus.commands')); $this->assertSame([], $container->getDefinition('messenger.bus.commands')->getArgument(0)); - $this->assertEquals([ - ['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.commands']], - ['id' => 'reject_redelivered_message_middleware'], - ['id' => 'dispatch_after_current_bus'], - ['id' => 'failed_message_processing_middleware'], - ['id' => 'send_message', 'arguments' => [true]], - ['id' => 'handle_message', 'arguments' => [false]], - ], $container->getParameter('messenger.bus.commands.middleware')); + + if (class_exists(DeduplicateMiddleware::class)) { + $this->assertEquals([ + ['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.commands']], + ['id' => 'reject_redelivered_message_middleware'], + ['id' => 'dispatch_after_current_bus'], + ['id' => 'failed_message_processing_middleware'], + ['id' => 'deduplicate_middleware'], + ['id' => 'send_message', 'arguments' => [true]], + ['id' => 'handle_message', 'arguments' => [false]], + ], $container->getParameter('messenger.bus.commands.middleware')); + } else { + $this->assertEquals([ + ['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.commands']], + ['id' => 'reject_redelivered_message_middleware'], + ['id' => 'dispatch_after_current_bus'], + ['id' => 'failed_message_processing_middleware'], + ['id' => 'send_message', 'arguments' => [true]], + ['id' => 'handle_message', 'arguments' => [false]], + ], $container->getParameter('messenger.bus.commands.middleware')); + } + $this->assertTrue($container->has('messenger.bus.events')); $this->assertSame([], $container->getDefinition('messenger.bus.events')->getArgument(0)); - $this->assertEquals([ - ['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.events']], - ['id' => 'reject_redelivered_message_middleware'], - ['id' => 'dispatch_after_current_bus'], - ['id' => 'failed_message_processing_middleware'], - ['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]], - ['id' => 'send_message', 'arguments' => [true]], - ['id' => 'handle_message', 'arguments' => [false]], - ], $container->getParameter('messenger.bus.events.middleware')); + + if (class_exists(DeduplicateMiddleware::class)) { + $this->assertEquals([ + ['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.events']], + ['id' => 'reject_redelivered_message_middleware'], + ['id' => 'dispatch_after_current_bus'], + ['id' => 'failed_message_processing_middleware'], + ['id' => 'deduplicate_middleware'], + ['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]], + ['id' => 'send_message', 'arguments' => [true]], + ['id' => 'handle_message', 'arguments' => [false]], + ], $container->getParameter('messenger.bus.events.middleware')); + } else { + $this->assertEquals([ + ['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.events']], + ['id' => 'reject_redelivered_message_middleware'], + ['id' => 'dispatch_after_current_bus'], + ['id' => 'failed_message_processing_middleware'], + ['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]], + ['id' => 'send_message', 'arguments' => [true]], + ['id' => 'handle_message', 'arguments' => [false]], + ], $container->getParameter('messenger.bus.events.middleware')); + } + $this->assertTrue($container->has('messenger.bus.queries')); $this->assertSame([], $container->getDefinition('messenger.bus.queries')->getArgument(0)); $this->assertEquals([ diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index 4d492dfd4952..2d276d41660c 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -4,6 +4,7 @@ CHANGELOG 7.2 --- + * Add `Symfony\Component\Messenger\Middleware\DeduplicateMiddleware` and `Symfony\Component\Messenger\Stamp\DeduplicateStamp` * Add `$previous` to the exception output at the `messenger:failed:show` command * `WrappedExceptionsInterface` now extends PHP's `Throwable` interface * Add `#[AsMessage]` attribute with `$transport` parameter for message routing diff --git a/src/Symfony/Component/Messenger/Middleware/DeduplicateMiddleware.php b/src/Symfony/Component/Messenger/Middleware/DeduplicateMiddleware.php new file mode 100644 index 000000000000..c252153b92a4 --- /dev/null +++ b/src/Symfony/Component/Messenger/Middleware/DeduplicateMiddleware.php @@ -0,0 +1,51 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Middleware; + +use Symfony\Component\Lock\LockFactory; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Stamp\DeduplicateStamp; +use Symfony\Component\Messenger\Stamp\ReceivedStamp; + +final class DeduplicateMiddleware implements MiddlewareInterface +{ + public function __construct(private LockFactory $lockFactory) + { + } + + public function handle(Envelope $envelope, StackInterface $stack): Envelope + { + if (!$stamp = $envelope->last(DeduplicateStamp::class)) { + return $stack->next()->handle($envelope, $stack); + } + + if (!$envelope->last(ReceivedStamp::class)) { + $lock = $this->lockFactory->createLockFromKey($stamp->getKey(), $stamp->getTtl(), autoRelease: false); + + if (!$lock->acquire()) { + return $envelope; + } + } elseif ($stamp->onlyDeduplicateInQueue()) { + $this->lockFactory->createLockFromKey($stamp->getKey())->release(); + } + + try { + $envelope = $stack->next()->handle($envelope, $stack); + } finally { + if ($envelope->last(ReceivedStamp::class) && !$stamp->onlyDeduplicateInQueue()) { + $this->lockFactory->createLockFromKey($stamp->getKey())->release(); + } + } + + return $envelope; + } +} diff --git a/src/Symfony/Component/Messenger/Stamp/DeduplicateStamp.php b/src/Symfony/Component/Messenger/Stamp/DeduplicateStamp.php new file mode 100644 index 000000000000..4e08d5369f26 --- /dev/null +++ b/src/Symfony/Component/Messenger/Stamp/DeduplicateStamp.php @@ -0,0 +1,42 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Stamp; + +use Symfony\Component\Lock\Key; + +final class DeduplicateStamp implements StampInterface +{ + private Key $key; + + public function __construct( + string $key, + private ?float $ttl = 300.0, + private bool $onlyDeduplicateInQueue = false, + ) { + $this->key = new Key($key); + } + + public function onlyDeduplicateInQueue(): bool + { + return $this->onlyDeduplicateInQueue; + } + + public function getKey(): Key + { + return $this->key; + } + + public function getTtl(): ?float + { + return $this->ttl; + } +} diff --git a/src/Symfony/Component/Messenger/Tests/Middleware/DeduplicateMiddlewareTest.php b/src/Symfony/Component/Messenger/Tests/Middleware/DeduplicateMiddlewareTest.php new file mode 100644 index 000000000000..50a302406a7a --- /dev/null +++ b/src/Symfony/Component/Messenger/Tests/Middleware/DeduplicateMiddlewareTest.php @@ -0,0 +1,68 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Tests\Middleware; + +use Symfony\Component\Lock\LockFactory; +use Symfony\Component\Lock\Store\FlockStore; +use Symfony\Component\Lock\Store\SemaphoreStore; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Middleware\DeduplicateMiddleware; +use Symfony\Component\Messenger\Stamp\DeduplicateStamp; +use Symfony\Component\Messenger\Stamp\ReceivedStamp; +use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase; +use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; + +final class DeduplicateMiddlewareTest extends MiddlewareTestCase +{ + public function testDeduplicateMiddlewareIgnoreIfMessageIsNotLockable() + { + $message = new DummyMessage('Hello'); + $envelope = new Envelope($message); + + $lockFactory = $this->createMock(LockFactory::class); + $lockFactory->expects($this->never())->method('createLock'); + + $decorator = new DeduplicateMiddleware($lockFactory); + + $decorator->handle($envelope, $this->getStackMock(true)); + } + + public function testDeduplicateMiddlewareIfMessageHasKey() + { + $message = new DummyMessage('Hello'); + $envelope = new Envelope($message, [new DeduplicateStamp('id')]); + + if (SemaphoreStore::isSupported()) { + $store = new SemaphoreStore(); + } else { + $store = new FlockStore(); + } + + $decorator = new DeduplicateMiddleware(new LockFactory($store)); + + $envelope = $decorator->handle($envelope, $this->getStackMock(true)); + $this->assertNotNull($envelope->last(DeduplicateStamp::class)); + + $message2 = new DummyMessage('Hello'); + $envelope2 = new Envelope($message2, [new DeduplicateStamp('id')]); + + $decorator->handle($envelope2, $this->getStackMock(false)); + + // Simulate receiving the first message + $envelope = $envelope->with(new ReceivedStamp('transport')); + $decorator->handle($envelope, $this->getStackMock(true)); + + $message3 = new DummyMessage('Hello'); + $envelope3 = new Envelope($message3, [new DeduplicateStamp('id')]); + $decorator->handle($envelope3, $this->getStackMock(true)); + } +} diff --git a/src/Symfony/Component/Messenger/composer.json b/src/Symfony/Component/Messenger/composer.json index 1f12f16662f8..94de9f2439c9 100644 --- a/src/Symfony/Component/Messenger/composer.json +++ b/src/Symfony/Component/Messenger/composer.json @@ -29,6 +29,7 @@ "symfony/http-kernel": "^6.4|^7.0", "symfony/process": "^6.4|^7.0", "symfony/property-access": "^6.4|^7.0", + "symfony/lock": "^6.4|^7.0", "symfony/rate-limiter": "^6.4|^7.0", "symfony/routing": "^6.4|^7.0", "symfony/serializer": "^6.4|^7.0", @@ -42,6 +43,7 @@ "symfony/event-dispatcher-contracts": "<2.5", "symfony/framework-bundle": "<6.4", "symfony/http-kernel": "<6.4", + "symfony/lock": "<6.4", "symfony/serializer": "<6.4" }, "autoload": {