Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit cf10c02

Browse filesBrowse files
committed
minor #34155 Revert SyncTransport simplification and fix properly (weaverryan)
This PR was squashed before being merged into the 4.4 branch (closes #34155). Discussion ---------- Revert SyncTransport simplification and fix properly | Q | A | ------------- | --- | Branch? | 4.4 | Bug fix? | yes | New feature? | no | Deprecations? | no | Tickets | Fix #34115 (and also related to #34066) | License | MIT | Doc PR | Not needed In #34069, I made `SyncTransport` simpler by removing that transport class and making the whole things a config trick. I felt GREAT about that solution... until i realized two big problems: 1) It kills using env vars for `sync://` because we read the config values at build time - #34115 - that could probably be fixed by adding a factory, but then there is also the next problem 2) If someone routed a message to `[async, sync]` (weird, but allowed), my #34069 config solution basically maps this internally to `[async]`, which actually causes the message to *not* be handled immediately. Basically, my solution only worked if you route a message ONLY to one sync transport, but fails if you route to multiple transports. So... this fixes things in a less-cool, but sensible way: A) The first commit reverts #34069 exactly B) The second commit solves the issue that we need to know if a message is being handled in a "worker" context or not, so middleware can decide if they should reset things before/after handling things. Previously we were using `ReceivedStamp` to know this. But because `SyncTransport` also "receives" the message and adds this stamp, it's not enough. To fix this, I added a new `ConsumedByWorkerStamp` that clearly means: "This message is being handled by a worker" (and so, you might want to "reset" some things before/after handling). Thanks! Commits ------- 01a9fef Adding ConsumedByWorkerStamp as way to mark a message in a "worker context" 38f19a9 Revert "[Messenger] Removing "sync" transport and replacing it with much nicer config trick"
2 parents aea43b2 + 01a9fef commit cf10c02
Copy full SHA for cf10c02

22 files changed

+243
-107
lines changed

‎src/Symfony/Bridge/Doctrine/Messenger/DoctrineClearEntityManagerMiddleware.php

Copy file name to clipboardExpand all lines: src/Symfony/Bridge/Doctrine/Messenger/DoctrineClearEntityManagerMiddleware.php
+2-2Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
use Doctrine\ORM\EntityManagerInterface;
1515
use Symfony\Component\Messenger\Envelope;
1616
use Symfony\Component\Messenger\Middleware\StackInterface;
17-
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
17+
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
1818

1919
/**
2020
* Clears entity manager after calling all handlers.
@@ -28,7 +28,7 @@ protected function handleForManager(EntityManagerInterface $entityManager, Envel
2828
try {
2929
return $stack->next()->handle($envelope, $stack);
3030
} finally {
31-
if (null !== $envelope->last(ReceivedStamp::class)) {
31+
if (null !== $envelope->last(ConsumedByWorkerStamp::class)) {
3232
$entityManager->clear();
3333
}
3434
}

‎src/Symfony/Bridge/Doctrine/Messenger/DoctrineCloseConnectionMiddleware.php

Copy file name to clipboardExpand all lines: src/Symfony/Bridge/Doctrine/Messenger/DoctrineCloseConnectionMiddleware.php
+2-2Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
use Doctrine\ORM\EntityManagerInterface;
1515
use Symfony\Component\Messenger\Envelope;
1616
use Symfony\Component\Messenger\Middleware\StackInterface;
17-
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
17+
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
1818

1919
/**
2020
* Closes connection and therefore saves number of connections.
@@ -30,7 +30,7 @@ protected function handleForManager(EntityManagerInterface $entityManager, Envel
3030

3131
return $stack->next()->handle($envelope, $stack);
3232
} finally {
33-
if (null !== $envelope->last(ReceivedStamp::class)) {
33+
if (null !== $envelope->last(ConsumedByWorkerStamp::class)) {
3434
$connection->close();
3535
}
3636
}

‎src/Symfony/Bridge/Doctrine/Messenger/DoctrinePingConnectionMiddleware.php

Copy file name to clipboardExpand all lines: src/Symfony/Bridge/Doctrine/Messenger/DoctrinePingConnectionMiddleware.php
+2-2Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
use Doctrine\ORM\EntityManagerInterface;
1515
use Symfony\Component\Messenger\Envelope;
1616
use Symfony\Component\Messenger\Middleware\StackInterface;
17-
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
17+
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
1818

1919
/**
2020
* Checks whether the connection is still open or reconnects otherwise.
@@ -25,7 +25,7 @@ class DoctrinePingConnectionMiddleware extends AbstractDoctrineMiddleware
2525
{
2626
protected function handleForManager(EntityManagerInterface $entityManager, Envelope $envelope, StackInterface $stack): Envelope
2727
{
28-
if (null !== $envelope->last(ReceivedStamp::class)) {
28+
if (null !== $envelope->last(ConsumedByWorkerStamp::class)) {
2929
$this->pingConnection($entityManager);
3030
}
3131

‎src/Symfony/Bridge/Doctrine/Tests/Messenger/DoctrineClearEntityManagerMiddlewareTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Bridge/Doctrine/Tests/Messenger/DoctrineClearEntityManagerMiddlewareTest.php
+2-2Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
use Symfony\Bridge\Doctrine\Messenger\DoctrineClearEntityManagerMiddleware;
1717
use Symfony\Component\Messenger\Envelope;
1818
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
19-
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
19+
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
2020
use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase;
2121

2222
class DoctrineClearEntityManagerMiddlewareTest extends MiddlewareTestCase
@@ -36,7 +36,7 @@ public function testMiddlewareClearEntityManager()
3636
$middleware = new DoctrineClearEntityManagerMiddleware($managerRegistry, 'default');
3737

3838
$envelope = new Envelope(new \stdClass(), [
39-
new ReceivedStamp('async'),
39+
new ConsumedByWorkerStamp(),
4040
]);
4141
$middleware->handle($envelope, $this->getStackMock());
4242
}

‎src/Symfony/Bridge/Doctrine/Tests/Messenger/DoctrineCloseConnectionMiddlewareTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Bridge/Doctrine/Tests/Messenger/DoctrineCloseConnectionMiddlewareTest.php
+2-2Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
use Symfony\Bridge\Doctrine\Messenger\DoctrineCloseConnectionMiddleware;
1818
use Symfony\Component\Messenger\Envelope;
1919
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
20-
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
20+
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
2121
use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase;
2222

2323
class DoctrineCloseConnectionMiddlewareTest extends MiddlewareTestCase
@@ -51,7 +51,7 @@ public function testMiddlewareCloseConnection()
5151
;
5252

5353
$envelope = new Envelope(new \stdClass(), [
54-
new ReceivedStamp('async'),
54+
new ConsumedByWorkerStamp(),
5555
]);
5656
$this->middleware->handle($envelope, $this->getStackMock());
5757
}

‎src/Symfony/Bridge/Doctrine/Tests/Messenger/DoctrinePingConnectionMiddlewareTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Bridge/Doctrine/Tests/Messenger/DoctrinePingConnectionMiddlewareTest.php
+3-3Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
use Symfony\Bridge\Doctrine\Messenger\DoctrinePingConnectionMiddleware;
1818
use Symfony\Component\Messenger\Envelope;
1919
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
20-
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
20+
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
2121
use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase;
2222

2323
class DoctrinePingConnectionMiddlewareTest extends MiddlewareTestCase
@@ -58,7 +58,7 @@ public function testMiddlewarePingOk()
5858
;
5959

6060
$envelope = new Envelope(new \stdClass(), [
61-
new ReceivedStamp('async'),
61+
new ConsumedByWorkerStamp(),
6262
]);
6363
$this->middleware->handle($envelope, $this->getStackMock());
6464
}
@@ -75,7 +75,7 @@ public function testMiddlewarePingResetEntityManager()
7575
;
7676

7777
$envelope = new Envelope(new \stdClass(), [
78-
new ReceivedStamp('async'),
78+
new ConsumedByWorkerStamp(),
7979
]);
8080
$this->middleware->handle($envelope, $this->getStackMock());
8181
}

‎src/Symfony/Bridge/Doctrine/composer.json

Copy file name to clipboardExpand all lines: src/Symfony/Bridge/Doctrine/composer.json
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
"symfony/dependency-injection": "^3.4|^4.0|^5.0",
3030
"symfony/form": "^4.4|^5.0",
3131
"symfony/http-kernel": "^3.4|^4.0|^5.0",
32-
"symfony/messenger": "^4.3|^5.0",
32+
"symfony/messenger": "^4.4|^5.0",
3333
"symfony/property-access": "^3.4|^4.0|^5.0",
3434
"symfony/property-info": "^3.4|^4.0|^5.0",
3535
"symfony/proxy-manager-bridge": "^3.4|^4.0|^5.0",

‎src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
+21-25Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1755,27 +1755,18 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
17551755
$container->setAlias('messenger.default_serializer', $config['serializer']['default_serializer']);
17561756
}
17571757

1758-
$senderReferences = [];
1759-
$syncTransports = [];
1758+
$senderAliases = [];
17601759
$transportRetryReferences = [];
17611760
foreach ($config['transports'] as $name => $transport) {
17621761
$serializerId = $transport['serializer'] ?? 'messenger.default_serializer';
17631762

1764-
if (0 === strpos($transport['dsn'], 'sync://')) {
1765-
$syncTransports[] = $name;
1766-
} else {
1767-
$transportDefinition = (new Definition(TransportInterface::class))
1768-
->setFactory([new Reference('messenger.transport_factory'), 'createTransport'])
1769-
->setArguments([$transport['dsn'], $transport['options'] + ['transport_name' => $name], new Reference($serializerId)])
1770-
->addTag('messenger.receiver', ['alias' => $name])
1771-
;
1772-
$container->setDefinition($transportId = 'messenger.transport.'.$name, $transportDefinition);
1773-
1774-
// alias => service_id
1775-
$senderReferences[$name] = new Reference($transportId);
1776-
// service_id => service_id
1777-
$senderReferences[$transportId] = new Reference($transportId);
1778-
}
1763+
$transportDefinition = (new Definition(TransportInterface::class))
1764+
->setFactory([new Reference('messenger.transport_factory'), 'createTransport'])
1765+
->setArguments([$transport['dsn'], $transport['options'] + ['transport_name' => $name], new Reference($serializerId)])
1766+
->addTag('messenger.receiver', ['alias' => $name])
1767+
;
1768+
$container->setDefinition($transportId = 'messenger.transport.'.$name, $transportDefinition);
1769+
$senderAliases[$name] = $transportId;
17791770

17801771
if (null !== $transport['retry_strategy']['service']) {
17811772
$transportRetryReferences[$name] = new Reference($transport['retry_strategy']['service']);
@@ -1793,25 +1784,30 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
17931784
}
17941785
}
17951786

1787+
$senderReferences = [];
1788+
// alias => service_id
1789+
foreach ($senderAliases as $alias => $serviceId) {
1790+
$senderReferences[$alias] = new Reference($serviceId);
1791+
}
1792+
// service_id => service_id
1793+
foreach ($senderAliases as $serviceId) {
1794+
$senderReferences[$serviceId] = new Reference($serviceId);
1795+
}
1796+
17961797
$messageToSendersMapping = [];
17971798
foreach ($config['routing'] as $message => $messageConfiguration) {
17981799
if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) {
17991800
throw new LogicException(sprintf('Invalid Messenger routing configuration: class or interface "%s" not found.', $message));
18001801
}
18011802

1802-
// filter out "sync" senders
1803-
$realSenders = [];
1803+
// make sure senderAliases contains all senders
18041804
foreach ($messageConfiguration['senders'] as $sender) {
1805-
if (isset($senderReferences[$sender])) {
1806-
$realSenders[] = $sender;
1807-
} elseif (!\in_array($sender, $syncTransports, true)) {
1805+
if (!isset($senderReferences[$sender])) {
18081806
throw new LogicException(sprintf('Invalid Messenger routing configuration: the "%s" class is being routed to a sender called "%s". This is not a valid transport or service id.', $message, $sender));
18091807
}
18101808
}
18111809

1812-
if ($realSenders) {
1813-
$messageToSendersMapping[$message] = $realSenders;
1814-
}
1810+
$messageToSendersMapping[$message] = $messageConfiguration['senders'];
18151811
}
18161812

18171813
$container->getDefinition('messenger.senders_locator')

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
+5Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,11 @@
7575
<tag name="messenger.transport_factory" />
7676
</service>
7777

78+
<service id="messenger.transport.sync.factory" class="Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory">
79+
<tag name="messenger.transport_factory" />
80+
<argument type="service" id="messenger.routable_message_bus" />
81+
</service>
82+
7883
<service id="messenger.transport.in_memory.factory" class="Symfony\Component\Messenger\Transport\InMemoryTransportFactory">
7984
<tag name="messenger.transport_factory" />
8085
<tag name="kernel.reset" method="reset" />

‎src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_sync_transport.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_sync_transport.php
-15Lines changed: 0 additions & 15 deletions
This file was deleted.

‎src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_sync_transport.xml

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_sync_transport.xml
-24Lines changed: 0 additions & 24 deletions
This file was deleted.

‎src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_sync_transport.yml

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_sync_transport.yml
-10Lines changed: 0 additions & 10 deletions
This file was deleted.

‎src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php
-13Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
use Symfony\Bundle\FrameworkBundle\DependencyInjection\Compiler\AddAnnotationsCachedReaderPass;
1717
use Symfony\Bundle\FrameworkBundle\DependencyInjection\FrameworkExtension;
1818
use Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\DummyMessage;
19-
use Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\FooMessage;
20-
use Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\SecondMessage;
2119
use Symfony\Bundle\FrameworkBundle\Tests\TestCase;
2220
use Symfony\Bundle\FullStack;
2321
use Symfony\Component\Cache\Adapter\AdapterInterface;
@@ -784,17 +782,6 @@ public function testMessengerInvalidTransportRouting()
784782
$this->createContainerFromFile('messenger_routing_invalid_transport');
785783
}
786784

787-
public function testMessengerSyncTransport()
788-
{
789-
$container = $this->createContainerFromFile('messenger_sync_transport');
790-
$senderLocatorDefinition = $container->getDefinition('messenger.senders_locator');
791-
792-
$sendersMapping = $senderLocatorDefinition->getArgument(0);
793-
$this->assertEquals(['amqp'], $sendersMapping[DummyMessage::class]);
794-
$this->assertArrayNotHasKey(SecondMessage::class, $sendersMapping);
795-
$this->assertEquals(['amqp'], $sendersMapping[FooMessage::class]);
796-
}
797-
798785
public function testTranslator()
799786
{
800787
$container = $this->createContainerFromFile('full');

‎src/Symfony/Component/Messenger/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/CHANGELOG.md
-1Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ CHANGELOG
44
4.4.0
55
-----
66

7-
* [BC BREAK] The `SyncTransport` and `SyncTransportFactory` classes were removed.
87
* Deprecated passing a `ContainerInterface` instance as first argument of the `ConsumeMessagesCommand` constructor,
98
pass a `RoutableMessageBus` instance instead.
109
* Added support for auto trimming of Redis streams.
+19Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Stamp;
13+
14+
/**
15+
* A marker that this message was consumed by a worker process.
16+
*/
17+
class ConsumedByWorkerStamp implements NonSendableStampInterface
18+
{
19+
}

‎src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php
+2-1Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,8 @@ public function testItReceivesSignals()
174174
Get envelope with message: Symfony\Component\Messenger\Tests\Fixtures\DummyMessage
175175
with stamps: [
176176
"Symfony\\Component\\Messenger\\Transport\\AmqpExt\\AmqpReceivedStamp",
177-
"Symfony\\Component\\Messenger\\Stamp\\ReceivedStamp"
177+
"Symfony\\Component\\Messenger\\Stamp\\ReceivedStamp",
178+
"Symfony\\Component\\Messenger\\Stamp\\ConsumedByWorkerStamp"
178179
]
179180
Done.
180181

+30Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
13+
14+
use PHPUnit\Framework\TestCase;
15+
use Symfony\Component\Messenger\MessageBusInterface;
16+
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
17+
use Symfony\Component\Messenger\Transport\Sync\SyncTransport;
18+
use Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory;
19+
20+
class SyncTransportFactoryTest extends TestCase
21+
{
22+
public function testCreateTransport()
23+
{
24+
$serializer = $this->createMock(SerializerInterface::class);
25+
$bus = $this->createMock(MessageBusInterface::class);
26+
$factory = new SyncTransportFactory($bus);
27+
$transport = $factory->createTransport('sync://', [], $serializer);
28+
$this->assertInstanceOf(SyncTransport::class, $transport);
29+
}
30+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.