Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 99881e6

Browse filesBrowse files
feature #60018 [Messenger] Reset peak memory usage for each message (TimWolla)
This PR was squashed before being merged into the 7.3 branch. Discussion ---------- [Messenger] Reset peak memory usage for each message | Q | A | ------------- | --- | Branch? | 7.3 | Bug fix? | no | New feature? | yes | Deprecations? | no | Issues | - | License | MIT PHP’s peak memory usage is a bit of global state that is not useful to keep in a long-running process handling individual self-contained messages, since a single high-memory message (handler) running early in a worker’s lifecycle will skew the numbers for all remaining messages processed by that worker. By resetting the peak memory usage for each message it becomes possible to measure a given message type’s maximum memory usage more accurately, allowing to optimize hardware resources, for example by placing individual messages with handlers requiring a high memory-usage into their own transport that is executed on a larger worker instance. As part of this change the cycle collection is also moved out of the Worker into an event-listener, since the cycle collection is not a core task of the Worker, since cycle collection would happen implicitly as well. Commits ------- 896ab90 [Messenger] Reset peak memory usage for each message
2 parents a8d6a36 + 896ab90 commit 99881e6
Copy full SHA for 99881e6

File tree

5 files changed

+107
-5
lines changed
Filter options

5 files changed

+107
-5
lines changed

‎src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php

Copy file name to clipboardExpand all lines: src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php
+4Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use Symfony\Component\Messenger\Bridge\Redis\Transport\RedisTransportFactory;
1919
use Symfony\Component\Messenger\EventListener\AddErrorDetailsStampListener;
2020
use Symfony\Component\Messenger\EventListener\DispatchPcntlSignalListener;
21+
use Symfony\Component\Messenger\EventListener\ResetMemoryUsageListener;
2122
use Symfony\Component\Messenger\EventListener\ResetServicesListener;
2223
use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener;
2324
use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener;
@@ -218,6 +219,9 @@
218219
service('services_resetter'),
219220
])
220221

222+
->set('messenger.listener.reset_memory_usage', ResetMemoryUsageListener::class)
223+
->tag('kernel.event_subscriber')
224+
221225
->set('messenger.routable_message_bus', RoutableMessageBus::class)
222226
->args([
223227
abstract_arg('message bus locator'),

‎src/Symfony/Component/Messenger/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/CHANGELOG.md
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ CHANGELOG
99
* Add `Symfony\Component\Messenger\Middleware\DeduplicateMiddleware` and `Symfony\Component\Messenger\Stamp\DeduplicateStamp`
1010
* Add `--class-filter` option to the `messenger:failed:remove` command
1111
* Add `$stamps` parameter to `HandleTrait::handle`
12+
* Add `Symfony\Component\Messenger\EventListener\ResetMemoryUsageListener` to reset PHP's peak memory usage for each processed message
1213

1314
7.2
1415
---
+48Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\EventListener;
13+
14+
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
15+
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
16+
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
17+
18+
/**
19+
* @author Tim Düsterhus <tim@tideways-gmbh.com>
20+
*/
21+
final class ResetMemoryUsageListener implements EventSubscriberInterface
22+
{
23+
private bool $collect = false;
24+
25+
public function resetBefore(WorkerMessageReceivedEvent $event): void
26+
{
27+
// Reset the peak memory usage for accurate measurement of the
28+
// memory usage on a per-message basis.
29+
memory_reset_peak_usage();
30+
$this->collect = true;
31+
}
32+
33+
public function collectAfter(WorkerRunningEvent $event): void
34+
{
35+
if ($event->isWorkerIdle() && $this->collect) {
36+
gc_collect_cycles();
37+
$this->collect = false;
38+
}
39+
}
40+
41+
public static function getSubscribedEvents(): array
42+
{
43+
return [
44+
WorkerMessageReceivedEvent::class => ['resetBefore', -1024],
45+
WorkerRunningEvent::class => ['collectAfter', -1024],
46+
];
47+
}
48+
}

‎src/Symfony/Component/Messenger/Tests/WorkerTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/WorkerTest.php
+54-3Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
2727
use Symfony\Component\Messenger\Event\WorkerStartedEvent;
2828
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
29+
use Symfony\Component\Messenger\EventListener\ResetMemoryUsageListener;
2930
use Symfony\Component\Messenger\EventListener\ResetServicesListener;
3031
use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener;
3132
use Symfony\Component\Messenger\Exception\RuntimeException;
@@ -586,7 +587,7 @@ public function testFlushBatchOnStop()
586587
$this->assertSame($expectedMessages, $handler->processedMessages);
587588
}
588589

589-
public function testGcCollectCyclesIsCalledOnMessageHandle()
590+
public function testGcCollectCyclesIsCalledOnIdleWorker()
590591
{
591592
$apiMessage = new DummyMessage('API');
592593

@@ -595,14 +596,64 @@ public function testGcCollectCyclesIsCalledOnMessageHandle()
595596
$bus = $this->createMock(MessageBusInterface::class);
596597

597598
$dispatcher = new EventDispatcher();
599+
$dispatcher->addSubscriber(new ResetMemoryUsageListener());
600+
$before = 0;
601+
$dispatcher->addListener(WorkerRunningEvent::class, function (WorkerRunningEvent $event) use (&$before) {
602+
static $i = 0;
603+
604+
$after = gc_status()['runs'];
605+
if (0 === $i) {
606+
$this->assertFalse($event->isWorkerIdle());
607+
$this->assertSame(0, $after - $before);
608+
} else if (1 === $i) {
609+
$this->assertTrue($event->isWorkerIdle());
610+
$this->assertSame(1, $after - $before);
611+
} else if (3 === $i) {
612+
// Wait a few idle phases before stopping.
613+
$this->assertSame(1, $after - $before);
614+
$event->getWorker()->stop();
615+
}
616+
617+
$i++;
618+
}, PHP_INT_MIN);
619+
620+
621+
$worker = new Worker(['transport' => $receiver], $bus, $dispatcher);
622+
623+
gc_collect_cycles();
624+
$before = gc_status()['runs'];
625+
626+
$worker->run([
627+
'sleep' => 0,
628+
]);
629+
}
630+
631+
public function testMemoryUsageIsResetOnMessageHandle()
632+
{
633+
$apiMessage = new DummyMessage('API');
634+
635+
$receiver = new DummyReceiver([[new Envelope($apiMessage)]]);
636+
637+
$bus = $this->createMock(MessageBusInterface::class);
638+
639+
$dispatcher = new EventDispatcher();
640+
$dispatcher->addSubscriber(new ResetMemoryUsageListener());
598641
$dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));
599642

643+
// Allocate and deallocate 4 MB. The use of random_int() is to
644+
// prevent compile-time optimization.
645+
$memory = str_repeat(random_int(0, 1), 4 * 1024 * 1024);
646+
unset($memory);
647+
648+
$before = memory_get_peak_usage();
649+
600650
$worker = new Worker(['transport' => $receiver], $bus, $dispatcher);
601651
$worker->run();
602652

603-
$gcStatus = gc_status();
653+
// This should be roughly 4 MB smaller than $before.
654+
$after = memory_get_peak_usage();
604655

605-
$this->assertGreaterThan(0, $gcStatus['runs']);
656+
$this->assertTrue($after < $before);
606657
}
607658

608659
/**

‎src/Symfony/Component/Messenger/Worker.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Worker.php
-2Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,6 @@ public function run(array $options = []): void
128128
// this should prevent multiple lower priority receivers from
129129
// blocking too long before the higher priority are checked
130130
if ($envelopeHandled) {
131-
gc_collect_cycles();
132-
133131
break;
134132
}
135133
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.