Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 38f19a9

Browse filesBrowse files
committed
Revert "[Messenger] Removing "sync" transport and replacing it with much nicer config trick"
This reverts commit 3d4e59a.
1 parent 0472dbf commit 38f19a9
Copy full SHA for 38f19a9

File tree

Expand file treeCollapse file tree

11 files changed

+202
-88
lines changed
Filter options
Expand file treeCollapse file tree

11 files changed

+202
-88
lines changed

‎src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
+21-25Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1775,27 +1775,18 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
17751775
$container->setAlias('messenger.default_serializer', $config['serializer']['default_serializer']);
17761776
}
17771777

1778-
$senderReferences = [];
1779-
$syncTransports = [];
1778+
$senderAliases = [];
17801779
$transportRetryReferences = [];
17811780
foreach ($config['transports'] as $name => $transport) {
17821781
$serializerId = $transport['serializer'] ?? 'messenger.default_serializer';
17831782

1784-
if (0 === strpos($transport['dsn'], 'sync://')) {
1785-
$syncTransports[] = $name;
1786-
} else {
1787-
$transportDefinition = (new Definition(TransportInterface::class))
1788-
->setFactory([new Reference('messenger.transport_factory'), 'createTransport'])
1789-
->setArguments([$transport['dsn'], $transport['options'] + ['transport_name' => $name], new Reference($serializerId)])
1790-
->addTag('messenger.receiver', ['alias' => $name])
1791-
;
1792-
$container->setDefinition($transportId = 'messenger.transport.'.$name, $transportDefinition);
1793-
1794-
// alias => service_id
1795-
$senderReferences[$name] = new Reference($transportId);
1796-
// service_id => service_id
1797-
$senderReferences[$transportId] = new Reference($transportId);
1798-
}
1783+
$transportDefinition = (new Definition(TransportInterface::class))
1784+
->setFactory([new Reference('messenger.transport_factory'), 'createTransport'])
1785+
->setArguments([$transport['dsn'], $transport['options'] + ['transport_name' => $name], new Reference($serializerId)])
1786+
->addTag('messenger.receiver', ['alias' => $name])
1787+
;
1788+
$container->setDefinition($transportId = 'messenger.transport.'.$name, $transportDefinition);
1789+
$senderAliases[$name] = $transportId;
17991790

18001791
if (null !== $transport['retry_strategy']['service']) {
18011792
$transportRetryReferences[$name] = new Reference($transport['retry_strategy']['service']);
@@ -1813,25 +1804,30 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
18131804
}
18141805
}
18151806

1807+
$senderReferences = [];
1808+
// alias => service_id
1809+
foreach ($senderAliases as $alias => $serviceId) {
1810+
$senderReferences[$alias] = new Reference($serviceId);
1811+
}
1812+
// service_id => service_id
1813+
foreach ($senderAliases as $serviceId) {
1814+
$senderReferences[$serviceId] = new Reference($serviceId);
1815+
}
1816+
18161817
$messageToSendersMapping = [];
18171818
foreach ($config['routing'] as $message => $messageConfiguration) {
18181819
if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) {
18191820
throw new LogicException(sprintf('Invalid Messenger routing configuration: class or interface "%s" not found.', $message));
18201821
}
18211822

1822-
// filter out "sync" senders
1823-
$realSenders = [];
1823+
// make sure senderAliases contains all senders
18241824
foreach ($messageConfiguration['senders'] as $sender) {
1825-
if (isset($senderReferences[$sender])) {
1826-
$realSenders[] = $sender;
1827-
} elseif (!\in_array($sender, $syncTransports, true)) {
1825+
if (!isset($senderReferences[$sender])) {
18281826
throw new LogicException(sprintf('Invalid Messenger routing configuration: the "%s" class is being routed to a sender called "%s". This is not a valid transport or service id.', $message, $sender));
18291827
}
18301828
}
18311829

1832-
if ($realSenders) {
1833-
$messageToSendersMapping[$message] = $realSenders;
1834-
}
1830+
$messageToSendersMapping[$message] = $messageConfiguration['senders'];
18351831
}
18361832

18371833
$container->getDefinition('messenger.senders_locator')

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
+5Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,11 @@
7575
<tag name="messenger.transport_factory" />
7676
</service>
7777

78+
<service id="messenger.transport.sync.factory" class="Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory">
79+
<tag name="messenger.transport_factory" />
80+
<argument type="service" id="messenger.routable_message_bus" />
81+
</service>
82+
7883
<service id="messenger.transport.in_memory.factory" class="Symfony\Component\Messenger\Transport\InMemoryTransportFactory">
7984
<tag name="messenger.transport_factory" />
8085
<tag name="kernel.reset" method="reset" />

‎src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_sync_transport.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_sync_transport.php
-15Lines changed: 0 additions & 15 deletions
This file was deleted.

‎src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_sync_transport.xml

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_sync_transport.xml
-24Lines changed: 0 additions & 24 deletions
This file was deleted.

‎src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_sync_transport.yml

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_sync_transport.yml
-10Lines changed: 0 additions & 10 deletions
This file was deleted.

‎src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php
-13Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
use Symfony\Bundle\FrameworkBundle\DependencyInjection\Compiler\AddAnnotationsCachedReaderPass;
1717
use Symfony\Bundle\FrameworkBundle\DependencyInjection\FrameworkExtension;
1818
use Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\DummyMessage;
19-
use Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\FooMessage;
20-
use Symfony\Bundle\FrameworkBundle\Tests\Fixtures\Messenger\SecondMessage;
2119
use Symfony\Bundle\FrameworkBundle\Tests\TestCase;
2220
use Symfony\Bundle\FullStack;
2321
use Symfony\Component\Cache\Adapter\AdapterInterface;
@@ -784,17 +782,6 @@ public function testMessengerInvalidTransportRouting()
784782
$this->createContainerFromFile('messenger_routing_invalid_transport');
785783
}
786784

787-
public function testMessengerSyncTransport()
788-
{
789-
$container = $this->createContainerFromFile('messenger_sync_transport');
790-
$senderLocatorDefinition = $container->getDefinition('messenger.senders_locator');
791-
792-
$sendersMapping = $senderLocatorDefinition->getArgument(0);
793-
$this->assertEquals(['amqp'], $sendersMapping[DummyMessage::class]);
794-
$this->assertArrayNotHasKey(SecondMessage::class, $sendersMapping);
795-
$this->assertEquals(['amqp'], $sendersMapping[FooMessage::class]);
796-
}
797-
798785
public function testTranslator()
799786
{
800787
$container = $this->createContainerFromFile('full');

‎src/Symfony/Component/Messenger/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/CHANGELOG.md
-1Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ CHANGELOG
44
4.4.0
55
-----
66

7-
* [BC BREAK] The `SyncTransport` and `SyncTransportFactory` classes were removed.
87
* Deprecated passing a `ContainerInterface` instance as first argument of the `ConsumeMessagesCommand` constructor,
98
pass a `RoutableMessageBus` instance instead.
109
* Added support for auto trimming of Redis streams.
+30Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
13+
14+
use PHPUnit\Framework\TestCase;
15+
use Symfony\Component\Messenger\MessageBusInterface;
16+
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
17+
use Symfony\Component\Messenger\Transport\Sync\SyncTransport;
18+
use Symfony\Component\Messenger\Transport\Sync\SyncTransportFactory;
19+
20+
class SyncTransportFactoryTest extends TestCase
21+
{
22+
public function testCreateTransport()
23+
{
24+
$serializer = $this->createMock(SerializerInterface::class);
25+
$bus = $this->createMock(MessageBusInterface::class);
26+
$factory = new SyncTransportFactory($bus);
27+
$transport = $factory->createTransport('sync://', [], $serializer);
28+
$this->assertInstanceOf(SyncTransport::class, $transport);
29+
}
30+
}
+41Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
13+
14+
use PHPUnit\Framework\TestCase;
15+
use Symfony\Component\Messenger\Envelope;
16+
use Symfony\Component\Messenger\MessageBusInterface;
17+
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
18+
use Symfony\Component\Messenger\Transport\Sync\SyncTransport;
19+
20+
class SyncTransportTest extends TestCase
21+
{
22+
public function testSend()
23+
{
24+
$bus = $this->createMock(MessageBusInterface::class);
25+
$bus->expects($this->once())
26+
->method('dispatch')
27+
->with($this->callback(function ($arg) {
28+
$this->assertInstanceOf(Envelope::class, $arg);
29+
30+
return true;
31+
}))
32+
->willReturnArgument(0);
33+
$message = new \stdClass();
34+
$envelope = new Envelope($message);
35+
$transport = new SyncTransport($bus);
36+
$envelope = $transport->send($envelope);
37+
38+
$this->assertSame($message, $envelope->getMessage());
39+
$this->assertNotNull($envelope->last(ReceivedStamp::class));
40+
}
41+
}
+65Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Transport\Sync;
13+
14+
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
16+
use Symfony\Component\Messenger\MessageBusInterface;
17+
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
18+
use Symfony\Component\Messenger\Stamp\SentStamp;
19+
use Symfony\Component\Messenger\Transport\TransportInterface;
20+
21+
/**
22+
* Transport that immediately marks messages as received and dispatches for handling.
23+
*
24+
* @author Ryan Weaver <ryan@symfonycasts.com>
25+
*/
26+
class SyncTransport implements TransportInterface
27+
{
28+
private $messageBus;
29+
30+
public function __construct(MessageBusInterface $messageBus)
31+
{
32+
$this->messageBus = $messageBus;
33+
}
34+
35+
public function get(): iterable
36+
{
37+
throw new InvalidArgumentException('You cannot receive messages from the Messenger SyncTransport.');
38+
}
39+
40+
public function stop(): void
41+
{
42+
throw new InvalidArgumentException('You cannot call stop() on the Messenger SyncTransport.');
43+
}
44+
45+
public function ack(Envelope $envelope): void
46+
{
47+
throw new InvalidArgumentException('You cannot call ack() on the Messenger SyncTransport.');
48+
}
49+
50+
public function reject(Envelope $envelope): void
51+
{
52+
throw new InvalidArgumentException('You cannot call reject() on the Messenger SyncTransport.');
53+
}
54+
55+
public function send(Envelope $envelope): Envelope
56+
{
57+
/** @var SentStamp|null $sentStamp */
58+
$sentStamp = $envelope->last(SentStamp::class);
59+
$alias = null === $sentStamp ? 'sync' : ($sentStamp->getSenderAlias() ?: $sentStamp->getSenderClass());
60+
61+
$envelope = $envelope->with(new ReceivedStamp($alias));
62+
63+
return $this->messageBus->dispatch($envelope);
64+
}
65+
}
+40Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Transport\Sync;
13+
14+
use Symfony\Component\Messenger\MessageBusInterface;
15+
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
16+
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
17+
use Symfony\Component\Messenger\Transport\TransportInterface;
18+
19+
/**
20+
* @author Ryan Weaver <ryan@symfonycasts.com>
21+
*/
22+
class SyncTransportFactory implements TransportFactoryInterface
23+
{
24+
private $messageBus;
25+
26+
public function __construct(MessageBusInterface $messageBus)
27+
{
28+
$this->messageBus = $messageBus;
29+
}
30+
31+
public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
32+
{
33+
return new SyncTransport($this->messageBus);
34+
}
35+
36+
public function supports(string $dsn, array $options): bool
37+
{
38+
return 0 === strpos($dsn, 'sync://');
39+
}
40+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.