Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit aa7000a

Browse filesBrowse files
Deduplicate Middleware
1 parent ecb9728 commit aa7000a
Copy full SHA for aa7000a

File tree

8 files changed

+225
-17
lines changed
Filter options

8 files changed

+225
-17
lines changed

‎src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
+8
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@
120120
use Symfony\Component\Messenger\Handler\BatchHandlerInterface;
121121
use Symfony\Component\Messenger\MessageBus;
122122
use Symfony\Component\Messenger\MessageBusInterface;
123+
use Symfony\Component\Messenger\Middleware\DeduplicateMiddleware;
123124
use Symfony\Component\Messenger\Middleware\RouterContextMiddleware;
124125
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
125126
use Symfony\Component\Messenger\Transport\TransportFactoryInterface as MessengerTransportFactoryInterface;
@@ -2266,6 +2267,13 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
22662267
['id' => 'handle_message'],
22672268
],
22682269
];
2270+
2271+
if (class_exists(DeduplicateMiddleware::class) && class_exists(LockFactory::class)) {
2272+
$defaultMiddleware['before'][] = ['id' => 'deduplicate_middleware'];
2273+
} else {
2274+
$container->removeDefinition('messenger.middleware.deduplicate_middleware');
2275+
}
2276+
22692277
foreach ($config['buses'] as $busId => $bus) {
22702278
$middleware = $bus['middleware'];
22712279

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php
+6
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
use Symfony\Component\Messenger\EventListener\StopWorkerOnRestartSignalListener;
2626
use Symfony\Component\Messenger\Handler\RedispatchMessageHandler;
2727
use Symfony\Component\Messenger\Middleware\AddBusNameStampMiddleware;
28+
use Symfony\Component\Messenger\Middleware\DeduplicateMiddleware;
2829
use Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware;
2930
use Symfony\Component\Messenger\Middleware\FailedMessageProcessingMiddleware;
3031
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
@@ -86,6 +87,11 @@
8687
->tag('monolog.logger', ['channel' => 'messenger'])
8788
->call('setLogger', [service('logger')->ignoreOnInvalid()])
8889

90+
->set('messenger.middleware.deduplicate_middleware', DeduplicateMiddleware::class)
91+
->args([
92+
service('lock.factory'),
93+
])
94+
8995
->set('messenger.middleware.add_bus_name_stamp_middleware', AddBusNameStampMiddleware::class)
9096
->abstract()
9197

‎src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTestCase.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTestCase.php
+47-17
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpTransportFactory;
6363
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdTransportFactory;
6464
use Symfony\Component\Messenger\Bridge\Redis\Transport\RedisTransportFactory;
65+
use Symfony\Component\Messenger\Middleware\DeduplicateMiddleware;
6566
use Symfony\Component\Messenger\Transport\TransportFactory;
6667
use Symfony\Component\Notifier\ChatterInterface;
6768
use Symfony\Component\Notifier\TexterInterface;
@@ -1061,25 +1062,54 @@ public function testMessengerWithMultipleBuses()
10611062

10621063
$this->assertTrue($container->has('messenger.bus.commands'));
10631064
$this->assertSame([], $container->getDefinition('messenger.bus.commands')->getArgument(0));
1064-
$this->assertEquals([
1065-
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.commands']],
1066-
['id' => 'reject_redelivered_message_middleware'],
1067-
['id' => 'dispatch_after_current_bus'],
1068-
['id' => 'failed_message_processing_middleware'],
1069-
['id' => 'send_message', 'arguments' => [true]],
1070-
['id' => 'handle_message', 'arguments' => [false]],
1071-
], $container->getParameter('messenger.bus.commands.middleware'));
1065+
1066+
if (class_exists(DeduplicateMiddleware::class)) {
1067+
$this->assertEquals([
1068+
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.commands']],
1069+
['id' => 'reject_redelivered_message_middleware'],
1070+
['id' => 'dispatch_after_current_bus'],
1071+
['id' => 'failed_message_processing_middleware'],
1072+
['id' => 'deduplicate_middleware'],
1073+
['id' => 'send_message', 'arguments' => [true]],
1074+
['id' => 'handle_message', 'arguments' => [false]],
1075+
], $container->getParameter('messenger.bus.commands.middleware'));
1076+
} else {
1077+
$this->assertEquals([
1078+
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.commands']],
1079+
['id' => 'reject_redelivered_message_middleware'],
1080+
['id' => 'dispatch_after_current_bus'],
1081+
['id' => 'failed_message_processing_middleware'],
1082+
['id' => 'send_message', 'arguments' => [true]],
1083+
['id' => 'handle_message', 'arguments' => [false]],
1084+
], $container->getParameter('messenger.bus.commands.middleware'));
1085+
}
1086+
10721087
$this->assertTrue($container->has('messenger.bus.events'));
10731088
$this->assertSame([], $container->getDefinition('messenger.bus.events')->getArgument(0));
1074-
$this->assertEquals([
1075-
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.events']],
1076-
['id' => 'reject_redelivered_message_middleware'],
1077-
['id' => 'dispatch_after_current_bus'],
1078-
['id' => 'failed_message_processing_middleware'],
1079-
['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]],
1080-
['id' => 'send_message', 'arguments' => [true]],
1081-
['id' => 'handle_message', 'arguments' => [false]],
1082-
], $container->getParameter('messenger.bus.events.middleware'));
1089+
1090+
if (class_exists(DeduplicateMiddleware::class)) {
1091+
$this->assertEquals([
1092+
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.events']],
1093+
['id' => 'reject_redelivered_message_middleware'],
1094+
['id' => 'dispatch_after_current_bus'],
1095+
['id' => 'failed_message_processing_middleware'],
1096+
['id' => 'deduplicate_middleware'],
1097+
['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]],
1098+
['id' => 'send_message', 'arguments' => [true]],
1099+
['id' => 'handle_message', 'arguments' => [false]],
1100+
], $container->getParameter('messenger.bus.events.middleware'));
1101+
} else {
1102+
$this->assertEquals([
1103+
['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.events']],
1104+
['id' => 'reject_redelivered_message_middleware'],
1105+
['id' => 'dispatch_after_current_bus'],
1106+
['id' => 'failed_message_processing_middleware'],
1107+
['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]],
1108+
['id' => 'send_message', 'arguments' => [true]],
1109+
['id' => 'handle_message', 'arguments' => [false]],
1110+
], $container->getParameter('messenger.bus.events.middleware'));
1111+
}
1112+
10831113
$this->assertTrue($container->has('messenger.bus.queries'));
10841114
$this->assertSame([], $container->getDefinition('messenger.bus.queries')->getArgument(0));
10851115
$this->assertEquals([

‎src/Symfony/Component/Messenger/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/CHANGELOG.md
+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ CHANGELOG
44
7.2
55
---
66

7+
* Add `Symfony\Component\Messenger\Middleware\DeduplicateMiddleware` and `Symfony\Component\Messenger\Stamp\DeduplicateStamp`
78
* Add `$previous` to the exception output at the `messenger:failed:show` command
89
* `WrappedExceptionsInterface` now extends PHP's `Throwable` interface
910
* Add `#[AsMessage]` attribute with `$transport` parameter for message routing
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Middleware;
13+
14+
use Symfony\Component\Lock\LockFactory;
15+
use Symfony\Component\Messenger\Envelope;
16+
use Symfony\Component\Messenger\Stamp\DeduplicateStamp;
17+
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
18+
19+
final class DeduplicateMiddleware implements MiddlewareInterface
20+
{
21+
public function __construct(private LockFactory $lockFactory)
22+
{
23+
}
24+
25+
public function handle(Envelope $envelope, StackInterface $stack): Envelope
26+
{
27+
if (!$stamp = $envelope->last(DeduplicateStamp::class)) {
28+
return $stack->next()->handle($envelope, $stack);
29+
}
30+
31+
if (!$envelope->last(ReceivedStamp::class)) {
32+
$lock = $this->lockFactory->createLockFromKey($stamp->getKey(), $stamp->getTtl(), autoRelease: false);
33+
34+
if (!$lock->acquire()) {
35+
return $envelope;
36+
}
37+
} elseif ($stamp->onlyDeduplicateInQueue()) {
38+
$this->lockFactory->createLockFromKey($stamp->getKey())->release();
39+
}
40+
41+
try {
42+
$envelope = $stack->next()->handle($envelope, $stack);
43+
} finally {
44+
if ($envelope->last(ReceivedStamp::class) && !$stamp->onlyDeduplicateInQueue()) {
45+
$this->lockFactory->createLockFromKey($stamp->getKey())->release();
46+
}
47+
}
48+
49+
return $envelope;
50+
}
51+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Stamp;
13+
14+
use Symfony\Component\Lock\Key;
15+
16+
final class DeduplicateStamp implements StampInterface
17+
{
18+
private Key $key;
19+
20+
public function __construct(
21+
string $key,
22+
private ?float $ttl = 300.0,
23+
private bool $onlyDeduplicateInQueue = false,
24+
) {
25+
$this->key = new Key($key);
26+
}
27+
28+
public function onlyDeduplicateInQueue(): bool
29+
{
30+
return $this->onlyDeduplicateInQueue;
31+
}
32+
33+
public function getKey(): Key
34+
{
35+
return $this->key;
36+
}
37+
38+
public function getTtl(): ?float
39+
{
40+
return $this->ttl;
41+
}
42+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Middleware;
13+
14+
use Symfony\Component\Lock\LockFactory;
15+
use Symfony\Component\Lock\Store\FlockStore;
16+
use Symfony\Component\Lock\Store\SemaphoreStore;
17+
use Symfony\Component\Messenger\Envelope;
18+
use Symfony\Component\Messenger\Middleware\DeduplicateMiddleware;
19+
use Symfony\Component\Messenger\Stamp\DeduplicateStamp;
20+
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
21+
use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase;
22+
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
23+
24+
final class DeduplicateMiddlewareTest extends MiddlewareTestCase
25+
{
26+
public function testDeduplicateMiddlewareIgnoreIfMessageIsNotLockable()
27+
{
28+
$message = new DummyMessage('Hello');
29+
$envelope = new Envelope($message);
30+
31+
$lockFactory = $this->createMock(LockFactory::class);
32+
$lockFactory->expects($this->never())->method('createLock');
33+
34+
$decorator = new DeduplicateMiddleware($lockFactory);
35+
36+
$decorator->handle($envelope, $this->getStackMock(true));
37+
}
38+
39+
public function testDeduplicateMiddlewareIfMessageHasKey()
40+
{
41+
$message = new DummyMessage('Hello');
42+
$envelope = new Envelope($message, [new DeduplicateStamp('id')]);
43+
44+
if (SemaphoreStore::isSupported()) {
45+
$store = new SemaphoreStore();
46+
} else {
47+
$store = new FlockStore();
48+
}
49+
50+
$decorator = new DeduplicateMiddleware(new LockFactory($store));
51+
52+
$envelope = $decorator->handle($envelope, $this->getStackMock(true));
53+
$this->assertNotNull($envelope->last(DeduplicateStamp::class));
54+
55+
$message2 = new DummyMessage('Hello');
56+
$envelope2 = new Envelope($message2, [new DeduplicateStamp('id')]);
57+
58+
$decorator->handle($envelope2, $this->getStackMock(false));
59+
60+
// Simulate receiving the first message
61+
$envelope = $envelope->with(new ReceivedStamp('transport'));
62+
$decorator->handle($envelope, $this->getStackMock(true));
63+
64+
$message3 = new DummyMessage('Hello');
65+
$envelope3 = new Envelope($message3, [new DeduplicateStamp('id')]);
66+
$decorator->handle($envelope3, $this->getStackMock(true));
67+
}
68+
}

‎src/Symfony/Component/Messenger/composer.json

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/composer.json
+2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
"symfony/http-kernel": "^6.4|^7.0",
3030
"symfony/process": "^6.4|^7.0",
3131
"symfony/property-access": "^6.4|^7.0",
32+
"symfony/lock": "^6.4|^7.0",
3233
"symfony/rate-limiter": "^6.4|^7.0",
3334
"symfony/routing": "^6.4|^7.0",
3435
"symfony/serializer": "^6.4|^7.0",
@@ -42,6 +43,7 @@
4243
"symfony/event-dispatcher-contracts": "<2.5",
4344
"symfony/framework-bundle": "<6.4",
4445
"symfony/http-kernel": "<6.4",
46+
"symfony/lock": "<6.4",
4547
"symfony/serializer": "<6.4"
4648
},
4749
"autoload": {

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.