Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 214a63a

Browse filesBrowse files
committed
[Messenger] Move container resetting after receiver acknowledging (in command)
1 parent 8bea384 commit 214a63a
Copy full SHA for 214a63a

File tree

13 files changed

+141
-65
lines changed
Filter options

13 files changed

+141
-65
lines changed

‎src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php
-4Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1333,10 +1333,6 @@ function ($a) {
13331333
->fixXmlConfig('option')
13341334
->children()
13351335
->scalarNode('dsn')->end()
1336-
->booleanNode('reset_on_message')
1337-
->defaultFalse()
1338-
->info('Reset container services after each message. Turn it on when the transport is async and run in a worker.')
1339-
->end()
13401336
->scalarNode('serializer')->defaultNull()->info('Service id of a custom serializer to use.')->end()
13411337
->arrayNode('options')
13421338
->normalizeKeys(false)

‎src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php
-13Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1977,7 +1977,6 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
19771977

19781978
$senderAliases = [];
19791979
$transportRetryReferences = [];
1980-
$transportNamesForResetServices = [];
19811980
foreach ($config['transports'] as $name => $transport) {
19821981
$serializerId = $transport['serializer'] ?? 'messenger.default_serializer';
19831982
$transportDefinition = (new Definition(TransportInterface::class))
@@ -2006,18 +2005,6 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
20062005

20072006
$transportRetryReferences[$name] = new Reference($retryServiceId);
20082007
}
2009-
if ($transport['reset_on_message']) {
2010-
$transportNamesForResetServices[] = $name;
2011-
}
2012-
}
2013-
2014-
if ($transportNamesForResetServices) {
2015-
$container
2016-
->getDefinition('messenger.listener.reset_services')
2017-
->replaceArgument(1, $transportNamesForResetServices)
2018-
;
2019-
} else {
2020-
$container->removeDefinition('messenger.listener.reset_services');
20212008
}
20222009

20232010
$senderReferences = [];

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/console.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/console.php
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@
142142
service('event_dispatcher'),
143143
service('logger')->nullOnInvalid(),
144144
[], // Receiver names
145+
service('services_resetter'),
145146
])
146147
->tag('console.command')
147148
->tag('monolog.logger', ['channel' => 'messenger'])

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php
-8Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
use Symfony\Component\Messenger\Bridge\Redis\Transport\RedisTransportFactory;
1919
use Symfony\Component\Messenger\EventListener\AddErrorDetailsStampListener;
2020
use Symfony\Component\Messenger\EventListener\DispatchPcntlSignalListener;
21-
use Symfony\Component\Messenger\EventListener\ResetServicesListener;
2221
use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener;
2322
use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener;
2423
use Symfony\Component\Messenger\EventListener\StopWorkerOnCustomStopExceptionListener;
@@ -198,13 +197,6 @@
198197
->set('messenger.listener.stop_worker_on_stop_exception_listener', StopWorkerOnCustomStopExceptionListener::class)
199198
->tag('kernel.event_subscriber')
200199

201-
->set('messenger.listener.reset_services', ResetServicesListener::class)
202-
->args([
203-
service('services_resetter'),
204-
abstract_arg('receivers names'),
205-
])
206-
->tag('kernel.event_subscriber')
207-
208200
->set('messenger.routable_message_bus', RoutableMessageBus::class)
209201
->args([
210202
abstract_arg('message bus locator'),

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/schema/symfony-1.0.xsd

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/schema/symfony-1.0.xsd
-1Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -505,7 +505,6 @@
505505
<xsd:attribute name="serializer" type="xsd:string" />
506506
<xsd:attribute name="dsn" type="xsd:string" />
507507
<xsd:attribute name="failure-transport" type="xsd:string" />
508-
<xsd:attribute name="reset-on-message" type="xsd:boolean" />
509508
</xsd:complexType>
510509

511510
<xsd:complexType name="messenger_retry_strategy">

‎src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_transports.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_transports.php
-1Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
'default' => 'amqp://localhost/%2f/messages',
1212
'customised' => [
1313
'dsn' => 'amqp://localhost/%2f/messages?exchange_name=exchange_name',
14-
'reset_on_message' => true,
1514
'options' => ['queue' => ['name' => 'Queue']],
1615
'serializer' => 'messenger.transport.native_php_serializer',
1716
'retry_strategy' => [

‎src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_transports.xml

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_transports.xml
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
<framework:messenger failure-transport="failed">
1111
<framework:serializer default-serializer="messenger.transport.symfony_serializer" />
1212
<framework:transport name="default" dsn="amqp://localhost/%2f/messages" />
13-
<framework:transport name="customised" dsn="amqp://localhost/%2f/messages?exchange_name=exchange_name" serializer="messenger.transport.native_php_serializer" reset-on-message="true">
13+
<framework:transport name="customised" dsn="amqp://localhost/%2f/messages?exchange_name=exchange_name" serializer="messenger.transport.native_php_serializer">
1414
<framework:options>
1515
<framework:queue>
1616
<framework:name>Queue</framework:name>

‎src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_transports.yml

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_transports.yml
-1Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ framework:
88
default: 'amqp://localhost/%2f/messages'
99
customised:
1010
dsn: 'amqp://localhost/%2f/messages?exchange_name=exchange_name'
11-
reset_on_message: true
1211
options:
1312
queue:
1413
name: Queue

‎src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php
-4Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -722,7 +722,6 @@ public function testMessenger()
722722
$this->assertTrue($container->hasDefinition('messenger.transport.redis.factory'));
723723
$this->assertTrue($container->hasDefinition('messenger.transport_factory'));
724724
$this->assertSame(TransportFactory::class, $container->getDefinition('messenger.transport_factory')->getClass());
725-
$this->assertFalse($container->hasDefinition('messenger.listener.reset_services'));
726725
}
727726

728727
public function testMessengerMultipleFailureTransports()
@@ -867,9 +866,6 @@ public function testMessengerTransports()
867866
return array_shift($values);
868867
}, $failureTransports);
869868
$this->assertEquals($expectedTransportsByFailureTransports, $failureTransportsReferences);
870-
871-
$this->assertTrue($container->hasDefinition('messenger.listener.reset_services'));
872-
$this->assertSame(['customised'], $container->getDefinition('messenger.listener.reset_services')->getArgument(1));
873869
}
874870

875871
public function testMessengerRouting()

‎src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
+26-3Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
use Symfony\Component\Console\Question\ChoiceQuestion;
2424
use Symfony\Component\Console\Style\SymfonyStyle;
2525
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
26+
use Symfony\Component\HttpKernel\DependencyInjection\ServicesResetter;
27+
use Symfony\Component\Messenger\EventListener\ResetServicesListener;
2628
use Symfony\Component\Messenger\EventListener\StopWorkerOnFailureLimitListener;
2729
use Symfony\Component\Messenger\EventListener\StopWorkerOnMemoryLimitListener;
2830
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
@@ -43,14 +45,22 @@ class ConsumeMessagesCommand extends Command
4345
private $logger;
4446
private $receiverNames;
4547
private $eventDispatcher;
46-
47-
public function __construct(RoutableMessageBus $routableBus, ContainerInterface $receiverLocator, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null, array $receiverNames = [])
48-
{
48+
private $servicesResetter;
49+
50+
public function __construct(
51+
RoutableMessageBus $routableBus,
52+
ContainerInterface $receiverLocator,
53+
EventDispatcherInterface $eventDispatcher,
54+
LoggerInterface $logger = null,
55+
array $receiverNames = [],
56+
$servicesResetter = null
57+
) {
4958
$this->routableBus = $routableBus;
5059
$this->receiverLocator = $receiverLocator;
5160
$this->logger = $logger;
5261
$this->receiverNames = $receiverNames;
5362
$this->eventDispatcher = $eventDispatcher;
63+
$this->servicesResetter = $servicesResetter;
5464

5565
parent::__construct();
5666
}
@@ -72,6 +82,7 @@ protected function configure(): void
7282
new InputOption('sleep', null, InputOption::VALUE_REQUIRED, 'Seconds to sleep before asking for new messages after no messages were found', 1),
7383
new InputOption('bus', 'b', InputOption::VALUE_REQUIRED, 'Name of the bus to which received messages should be dispatched (if not passed, bus is determined automatically)'),
7484
new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'),
85+
new InputOption('reset-services-on-message', null, InputOption::VALUE_NEGATABLE, 'Reset (or do not --no-reset-services-on-message) container services after each message', false),
7586
])
7687
->setDescription(self::$defaultDescription)
7788
->setHelp(<<<'EOF'
@@ -159,6 +170,18 @@ protected function execute(InputInterface $input, OutputInterface $output)
159170
$receivers[$receiverName] = $this->receiverLocator->get($receiverName);
160171
}
161172

173+
if (!$input->hasParameterOption(['--reset-services-on-message', '--no-reset-services-on-message'], true)) {
174+
trigger_deprecation('symfony/messenger', '5.4', 'Not setting either "--reset-services-on-message" nor "--no-reset-services-on-message" option explicitly is deprecated, its default value will change to true in 6.0.');
175+
}
176+
177+
if ($input->getOption('reset-services-on-message')) {
178+
if (null === $this->servicesResetter) {
179+
throw new RuntimeException(sprintf('Please set a $servicesResetter with "%s" instance to use resetting services after each message.', ServicesResetter::class));
180+
}
181+
182+
$this->eventDispatcher->addSubscriber(new ResetServicesListener($this->servicesResetter));
183+
}
184+
162185
$stopsWhen = [];
163186
if ($limit = $input->getOption('limit')) {
164187
$stopsWhen[] = "processed {$limit} messages";

‎src/Symfony/Component/Messenger/EventListener/ResetServicesListener.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/EventListener/ResetServicesListener.php
+5-14Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,38 +13,29 @@
1313

1414
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
1515
use Symfony\Component\HttpKernel\DependencyInjection\ServicesResetter;
16-
use Symfony\Component\Messenger\Event\AbstractWorkerMessageEvent;
17-
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
18-
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
16+
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
1917

2018
/**
2119
* @author Grégoire Pineau <lyrixx@lyrixx.info>
2220
*/
2321
class ResetServicesListener implements EventSubscriberInterface
2422
{
2523
private $servicesResetter;
26-
private $receiversName;
2724

28-
public function __construct(ServicesResetter $servicesResetter, array $receiversName)
25+
public function __construct(ServicesResetter $servicesResetter)
2926
{
3027
$this->servicesResetter = $servicesResetter;
31-
$this->receiversName = $receiversName;
3228
}
3329

34-
public function resetServices(AbstractWorkerMessageEvent $event)
30+
public function resetServices(): void
3531
{
36-
if (!\in_array($event->getReceiverName(), $this->receiversName, true)) {
37-
return;
38-
}
39-
4032
$this->servicesResetter->reset();
4133
}
4234

43-
public static function getSubscribedEvents()
35+
public static function getSubscribedEvents(): array
4436
{
4537
return [
46-
WorkerMessageHandledEvent::class => ['resetServices'],
47-
WorkerMessageFailedEvent::class => ['resetServices'],
38+
WorkerRunningEvent::class => ['resetServices'],
4839
];
4940
}
5041
}

‎src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php
+104Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use Symfony\Component\DependencyInjection\ServiceLocator;
1919
use Symfony\Component\EventDispatcher\EventDispatcher;
2020
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
21+
use Symfony\Component\HttpKernel\DependencyInjection\ServicesResetter;
2122
use Symfony\Component\Messenger\Command\ConsumeMessagesCommand;
2223
use Symfony\Component\Messenger\Envelope;
2324
use Symfony\Component\Messenger\MessageBusInterface;
@@ -61,6 +62,7 @@ public function testBasicRun()
6162
$tester->execute([
6263
'receivers' => ['dummy-receiver'],
6364
'--limit' => 1,
65+
'--no-reset-services-on-message' => null,
6466
]);
6567

6668
$tester->assertCommandIsSuccessful();
@@ -94,6 +96,108 @@ public function testRunWithBusOption()
9496
'receivers' => ['dummy-receiver'],
9597
'--bus' => 'dummy-bus',
9698
'--limit' => 1,
99+
'--no-reset-services-on-message' => null,
100+
]);
101+
102+
$tester->assertCommandIsSuccessful();
103+
$this->assertStringContainsString('[OK] Consuming messages from transports "dummy-receiver"', $tester->getDisplay());
104+
}
105+
106+
public function provideRunWithResetServicesOption(): iterable
107+
{
108+
yield [true];
109+
yield [false];
110+
}
111+
112+
/**
113+
* @dataProvider provideRunWithResetServicesOption
114+
*/
115+
public function testRunWithResetServicesOption(bool $shouldReset)
116+
{
117+
$envelope = new Envelope(new \stdClass());
118+
119+
$receiver = $this->createMock(ReceiverInterface::class);
120+
$receiver->expects($this->once())->method('get')->willReturn([$envelope]);
121+
122+
$receiverLocator = $this->createMock(ContainerInterface::class);
123+
$receiverLocator->expects($this->once())->method('has')->with('dummy-receiver')->willReturn(true);
124+
$receiverLocator->expects($this->once())->method('get')->with('dummy-receiver')->willReturn($receiver);
125+
126+
$bus = $this->createMock(RoutableMessageBus::class);
127+
$bus->expects($this->once())->method('dispatch');
128+
129+
$servicesResetter = $this->createMock(ServicesResetter::class);
130+
$servicesResetter->expects($shouldReset ? $this->once() : $this->never())->method('reset');
131+
132+
$command = new ConsumeMessagesCommand($bus, $receiverLocator, new EventDispatcher(), null, [], $servicesResetter);
133+
134+
$application = new Application();
135+
$application->add($command);
136+
$tester = new CommandTester($application->get('messenger:consume'));
137+
$tester->execute([
138+
'receivers' => ['dummy-receiver'],
139+
'--limit' => 1,
140+
$shouldReset ? '--reset-services-on-message' : '--no-reset-services-on-message' => null,
141+
]);
142+
143+
$tester->assertCommandIsSuccessful();
144+
$this->assertStringContainsString('[OK] Consuming messages from transports "dummy-receiver"', $tester->getDisplay());
145+
}
146+
147+
public function testErrorOnResetServicesWithoutResetter()
148+
{
149+
$receiver = $this->createMock(ReceiverInterface::class);
150+
$receiver->expects($this->never())->method('get');
151+
152+
$receiverLocator = $this->createConfiguredMock(ContainerInterface::class, [
153+
'has' => true,
154+
'get' => $receiver,
155+
]);
156+
157+
$command = new ConsumeMessagesCommand($this->createMock(RoutableMessageBus::class), $receiverLocator, new EventDispatcher());
158+
159+
$application = new Application();
160+
$application->add($command);
161+
$tester = new CommandTester($application->get('messenger:consume'));
162+
163+
$this->expectException(\RuntimeException::class);
164+
$this->expectExceptionMessage('Please set a $servicesResetter');
165+
166+
$tester->execute([
167+
'receivers' => ['dummy-receiver'],
168+
'--reset-services-on-message' => null,
169+
]);
170+
}
171+
172+
/**
173+
* @group legacy
174+
*/
175+
public function testBasicRunWithoutResetServicesOption()
176+
{
177+
$envelope = new Envelope(new \stdClass(), [new BusNameStamp('dummy-bus')]);
178+
179+
$receiver = $this->createMock(ReceiverInterface::class);
180+
$receiver->expects($this->once())->method('get')->willReturn([$envelope]);
181+
182+
$receiverLocator = $this->createMock(ContainerInterface::class);
183+
$receiverLocator->expects($this->once())->method('has')->with('dummy-receiver')->willReturn(true);
184+
$receiverLocator->expects($this->once())->method('get')->with('dummy-receiver')->willReturn($receiver);
185+
186+
$bus = $this->createMock(MessageBusInterface::class);
187+
$bus->expects($this->once())->method('dispatch');
188+
189+
$busLocator = $this->createMock(ContainerInterface::class);
190+
$busLocator->expects($this->once())->method('has')->with('dummy-bus')->willReturn(true);
191+
$busLocator->expects($this->once())->method('get')->with('dummy-bus')->willReturn($bus);
192+
193+
$command = new ConsumeMessagesCommand(new RoutableMessageBus($busLocator), $receiverLocator, new EventDispatcher());
194+
195+
$application = new Application();
196+
$application->add($command);
197+
$tester = new CommandTester($application->get('messenger:consume'));
198+
$tester->execute([
199+
'receivers' => ['dummy-receiver'],
200+
'--limit' => 1,
97201
]);
98202

99203
$tester->assertCommandIsSuccessful();

‎src/Symfony/Component/Messenger/Tests/EventListener/ResetServicesListenerTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/EventListener/ResetServicesListenerTest.php
+4-15Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,27 +13,16 @@
1313

1414
use PHPUnit\Framework\TestCase;
1515
use Symfony\Component\HttpKernel\DependencyInjection\ServicesResetter;
16-
use Symfony\Component\Messenger\Envelope;
17-
use Symfony\Component\Messenger\Event\AbstractWorkerMessageEvent;
1816
use Symfony\Component\Messenger\EventListener\ResetServicesListener;
1917

2018
class ResetServicesListenerTest extends TestCase
2119
{
22-
public function provideTests(): iterable
23-
{
24-
yield ['foo', true];
25-
yield ['bar', false];
26-
}
27-
28-
/** @dataProvider provideTests */
29-
public function test(string $receiverName, bool $shouldReset)
20+
public function test()
3021
{
3122
$servicesResetter = $this->createMock(ServicesResetter::class);
32-
$servicesResetter->expects($shouldReset ? $this->once() : $this->never())->method('reset');
33-
34-
$event = new class(new Envelope(new \stdClass()), $receiverName) extends AbstractWorkerMessageEvent {};
23+
$servicesResetter->expects($this->once())->method('reset');
3524

36-
$resetListener = new ResetServicesListener($servicesResetter, ['foo']);
37-
$resetListener->resetServices($event);
25+
$resetListener = new ResetServicesListener($servicesResetter);
26+
$resetListener->resetServices();
3827
}
3928
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.