Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 9a65138

Browse filesBrowse files
committed
bug #46646 [Messenger] move resetting services at worker stopped into listener (Thomas Talbot)
This PR was merged into the 5.4 branch. Discussion ---------- [Messenger] move resetting services at worker stopped into listener | Q | A | ------------- | --- | Branch? | 5.4 | Bug fix? | yes | New feature? | no | Deprecations? | no | Tickets | Fix #46349 | License | MIT This PR tries to fix an issue about "--no-reset" introduced by a [previous fix](a4c8f4e). Instead of resetting the resettable receivers after the WorkerStoppedEvent dispatch, the ResetServicesListener subscribes to it and resets the resettable services. It allows --no-reset to work again because this option decides the ResetServicesListener to be added or not. Commits ------- 16cf4fc [Messenger] move resetting services at worker stopped into ResetServicesListener
2 parents 2faa354 + 16cf4fc commit 9a65138
Copy full SHA for 9a65138

File tree

5 files changed

+66
-22
lines changed
Filter options

5 files changed

+66
-22
lines changed

‎src/Symfony/Component/Messenger/EventListener/ResetServicesListener.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/EventListener/ResetServicesListener.php
+8Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
1515
use Symfony\Component\HttpKernel\DependencyInjection\ServicesResetter;
1616
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
17+
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
18+
use Symfony\Contracts\Service\ResetInterface;
1719

1820
/**
1921
* @author Grégoire Pineau <lyrixx@lyrixx.info>
@@ -34,10 +36,16 @@ public function resetServices(WorkerRunningEvent $event): void
3436
}
3537
}
3638

39+
public function resetServicesAtStop(WorkerStoppedEvent $event): void
40+
{
41+
$this->servicesResetter->reset();
42+
}
43+
3744
public static function getSubscribedEvents(): array
3845
{
3946
return [
4047
WorkerRunningEvent::class => ['resetServices', -1024],
48+
WorkerStoppedEvent::class => ['resetServicesAtStop', -1024],
4149
];
4250
}
4351
}

‎src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Command/ConsumeMessagesCommandTest.php
+8-11Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
use Symfony\Component\Messenger\MessageBusInterface;
2727
use Symfony\Component\Messenger\RoutableMessageBus;
2828
use Symfony\Component\Messenger\Stamp\BusNameStamp;
29+
use Symfony\Component\Messenger\Tests\ResettableDummyReceiver;
2930
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
3031

3132
class ConsumeMessagesCommandTest extends TestCase
@@ -116,15 +117,11 @@ public function testRunWithResetServicesOption(bool $shouldReset)
116117
{
117118
$envelope = new Envelope(new \stdClass());
118119

119-
$receiver = $this->createMock(ReceiverInterface::class);
120-
$receiver
121-
->expects($this->exactly(3))
122-
->method('get')
123-
->willReturnOnConsecutiveCalls(
124-
[$envelope],
125-
[/* idle */],
126-
[$envelope, $envelope]
127-
);
120+
$receiver = new ResettableDummyReceiver([
121+
[$envelope],
122+
[/* idle */],
123+
[$envelope, $envelope],
124+
]);
128125
$msgCount = 3;
129126

130127
$receiverLocator = $this->createMock(ContainerInterface::class);
@@ -134,8 +131,7 @@ public function testRunWithResetServicesOption(bool $shouldReset)
134131
$bus = $this->createMock(RoutableMessageBus::class);
135132
$bus->expects($this->exactly($msgCount))->method('dispatch');
136133

137-
$servicesResetter = $this->createMock(ServicesResetter::class);
138-
$servicesResetter->expects($this->exactly($shouldReset ? $msgCount : 0))->method('reset');
134+
$servicesResetter = new ServicesResetter(new \ArrayIterator([$receiver]), ['reset']);
139135

140136
$command = new ConsumeMessagesCommand($bus, $receiverLocator, new EventDispatcher(), null, [], new ResetServicesListener($servicesResetter));
141137

@@ -148,6 +144,7 @@ public function testRunWithResetServicesOption(bool $shouldReset)
148144
'--limit' => $msgCount,
149145
], $shouldReset ? [] : ['--no-reset' => null]));
150146

147+
$this->assertEquals($shouldReset, $receiver->hasBeenReset(), '$receiver->reset() should have been called');
151148
$tester->assertCommandIsSuccessful();
152149
$this->assertStringContainsString('[OK] Consuming messages from transports "dummy-receiver"', $tester->getDisplay());
153150
}

‎src/Symfony/Component/Messenger/Tests/EventListener/ResetServicesListenerTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/EventListener/ResetServicesListenerTest.php
+12Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use PHPUnit\Framework\TestCase;
1515
use Symfony\Component\HttpKernel\DependencyInjection\ServicesResetter;
1616
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
17+
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
1718
use Symfony\Component\Messenger\EventListener\ResetServicesListener;
1819
use Symfony\Component\Messenger\Worker;
1920

@@ -38,4 +39,15 @@ public function testResetServices(bool $shouldReset)
3839
$resetListener = new ResetServicesListener($servicesResetter);
3940
$resetListener->resetServices($event);
4041
}
42+
43+
public function testResetServicesAtStop()
44+
{
45+
$servicesResetter = $this->createMock(ServicesResetter::class);
46+
$servicesResetter->expects($this->once())->method('reset');
47+
48+
$event = new WorkerStoppedEvent($this->createMock(Worker::class));
49+
50+
$resetListener = new ResetServicesListener($servicesResetter);
51+
$resetListener->resetServicesAtStop($event);
52+
}
4153
}

‎src/Symfony/Component/Messenger/Tests/WorkerTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/WorkerTest.php
+38-1Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,15 @@
1414
use PHPUnit\Framework\TestCase;
1515
use Psr\Log\LoggerInterface;
1616
use Symfony\Component\EventDispatcher\EventDispatcher;
17+
use Symfony\Component\HttpKernel\DependencyInjection\ServicesResetter;
1718
use Symfony\Component\Messenger\Envelope;
1819
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
1920
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
2021
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
2122
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
2223
use Symfony\Component\Messenger\Event\WorkerStartedEvent;
2324
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
25+
use Symfony\Component\Messenger\EventListener\ResetServicesListener;
2426
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
2527
use Symfony\Component\Messenger\Exception\RuntimeException;
2628
use Symfony\Component\Messenger\Handler\Acknowledger;
@@ -103,15 +105,50 @@ public function testWorkerResetsConnectionIfReceiverIsResettable()
103105
{
104106
$resettableReceiver = new ResettableDummyReceiver([]);
105107

106-
$bus = $this->createMock(MessageBusInterface::class);
107108
$dispatcher = new EventDispatcher();
109+
$dispatcher->addSubscriber(new ResetServicesListener(new ServicesResetter(new \ArrayIterator([$resettableReceiver]), ['reset'])));
108110

111+
$bus = $this->createMock(MessageBusInterface::class);
109112
$worker = new Worker([$resettableReceiver], $bus, $dispatcher);
110113
$worker->stop();
111114
$worker->run();
112115
$this->assertTrue($resettableReceiver->hasBeenReset());
113116
}
114117

118+
public function testWorkerResetsTransportsIfResetServicesListenerIsCalled()
119+
{
120+
$envelope = new Envelope(new DummyMessage('Hello'));
121+
$resettableReceiver = new ResettableDummyReceiver([[$envelope]]);
122+
123+
$dispatcher = new EventDispatcher();
124+
$dispatcher->addSubscriber(new ResetServicesListener(new ServicesResetter(new \ArrayIterator([$resettableReceiver]), ['reset'])));
125+
$dispatcher->addListener(WorkerRunningEvent::class, function (WorkerRunningEvent $event) {
126+
$event->getWorker()->stop();
127+
});
128+
129+
$bus = $this->createMock(MessageBusInterface::class);
130+
$worker = new Worker([$resettableReceiver], $bus, $dispatcher);
131+
$worker->run();
132+
$this->assertTrue($resettableReceiver->hasBeenReset());
133+
}
134+
135+
public function testWorkerDoesNotResetTransportsIfResetServicesListenerIsNotCalled()
136+
{
137+
$envelope = new Envelope(new DummyMessage('Hello'));
138+
$resettableReceiver = new ResettableDummyReceiver([[$envelope]]);
139+
140+
$bus = $this->createMock(MessageBusInterface::class);
141+
142+
$dispatcher = new EventDispatcher();
143+
$dispatcher->addListener(WorkerRunningEvent::class, function (WorkerRunningEvent $event) {
144+
$event->getWorker()->stop();
145+
});
146+
147+
$worker = new Worker([$resettableReceiver], $bus, $dispatcher);
148+
$worker->run();
149+
$this->assertFalse($resettableReceiver->hasBeenReset());
150+
}
151+
115152
public function testWorkerDoesNotSendNullMessagesToTheBus()
116153
{
117154
$receiver = new DummyReceiver([

‎src/Symfony/Component/Messenger/Worker.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Worker.php
-10Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,6 @@ public function run(array $options = []): void
136136

137137
$this->flush(true);
138138
$this->dispatchEvent(new WorkerStoppedEvent($this));
139-
$this->resetReceiverConnections();
140139
}
141140

142141
private function handleMessage(Envelope $envelope, string $transportName): void
@@ -260,15 +259,6 @@ public function getMetadata(): WorkerMetadata
260259
return $this->metadata;
261260
}
262261

263-
private function resetReceiverConnections(): void
264-
{
265-
foreach ($this->receivers as $receiver) {
266-
if ($receiver instanceof ResetInterface) {
267-
$receiver->reset();
268-
}
269-
}
270-
}
271-
272262
private function dispatchEvent(object $event): void
273263
{
274264
if (null === $this->eventDispatcher) {

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.