Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit a90eca6

Browse filesBrowse files
committed
feature #51651 [Scheduler] Fix stateful scheduler (valtzu)
This PR was merged into the 6.4 branch. Discussion ---------- [Scheduler] Fix stateful scheduler | Q | A | ------------- | --- | Branch? | 6.3 | Bug fix? | yes | New feature? | no | Deprecations? | no | Tickets | Fix #51646, #51384 | License | MIT Stateful scheduler seems rather broken at the moment, see #51384 (comment). Let's fix it by storing the original first run start time, that way it's always possible to recalculate the state. Catching-up works now: ``` [23:14:11.709710] Worker started [23:14:11.759318] every 2 seconds [23:14:11.760291] every 2 seconds [23:14:11.761257] every 2 seconds [23:14:11.763244] every 2 seconds [23:14:12.637054] every 2 seconds [23:14:14.620595] every 2 seconds [23:14:16.632170] every 2 seconds ``` Whereas before it would only start on from the current item, possibly skipping previous items like stated in #51646. _(is this a bc break?)_ I will be waiting for input from authors of the related issues. --- One test is failing because because `getNextRunDate` is called with `2020-02-20T01:59:00+02:00` and then next run date is expected at `2020-02-20T02:09:00+02:00` but we get `2020-02-20T02:00:00+02:00` because that's set as `from`. I don't quite get the logic, I would assume that it is expected to be run immediately on `from` :thinking: Commits ------- 2d5856b Fix stateful scheduler
2 parents fc2777f + 2d5856b commit a90eca6
Copy full SHA for a90eca6

File tree

Expand file treeCollapse file tree

10 files changed

+75
-26
lines changed
Filter options
Expand file treeCollapse file tree

10 files changed

+75
-26
lines changed

‎src/Symfony/Component/Scheduler/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Component/Scheduler/CHANGELOG.md
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ CHANGELOG
1010
* Add `AbstractTriggerDecorator`
1111
* Make `ScheduledStamp` "send-able"
1212
* Add `ScheduledStamp` to `RedispatchMessage`
13+
* Add `from()` to `CheckpointInterface`
1314

1415
6.3
1516
---

‎src/Symfony/Component/Scheduler/Generator/Checkpoint.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Scheduler/Generator/Checkpoint.php
+13-3Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
final class Checkpoint implements CheckpointInterface
1818
{
19+
private \DateTimeImmutable $from;
1920
private \DateTimeImmutable $time;
2021
private int $index = -1;
2122
private bool $reset = false;
@@ -41,14 +42,22 @@ public function acquire(\DateTimeImmutable $now): bool
4142
$this->save($now, -1);
4243
}
4344

44-
$this->time ??= $now;
4545
if ($this->cache) {
46-
$this->save(...$this->cache->get($this->name, fn () => [$now, -1]));
46+
[$this->time, $this->index, $this->from] = $this->cache->get($this->name, fn () => [$now, -1, $now]) + [2 => $now];
47+
$this->save($this->time, $this->index);
4748
}
4849

50+
$this->time ??= $now;
51+
$this->from ??= $now;
52+
4953
return true;
5054
}
5155

56+
public function from(): \DateTimeImmutable
57+
{
58+
return $this->from;
59+
}
60+
5261
public function time(): \DateTimeImmutable
5362
{
5463
return $this->time;
@@ -63,7 +72,8 @@ public function save(\DateTimeImmutable $time, int $index): void
6372
{
6473
$this->time = $time;
6574
$this->index = $index;
66-
$this->cache?->get($this->name, fn () => [$time, $index], \INF);
75+
$this->from ??= $time;
76+
$this->cache?->get($this->name, fn () => [$time, $index, $this->from], \INF);
6777
}
6878

6979
/**

‎src/Symfony/Component/Scheduler/Generator/CheckpointInterface.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Scheduler/Generator/CheckpointInterface.php
+2Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ interface CheckpointInterface
1515
{
1616
public function acquire(\DateTimeImmutable $now): bool;
1717

18+
public function from(): \DateTimeImmutable;
19+
1820
public function time(): \DateTimeImmutable;
1921

2022
public function index(): int;

‎src/Symfony/Component/Scheduler/Generator/MessageGenerator.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Scheduler/Generator/MessageGenerator.php
+11-3Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
use Symfony\Component\Scheduler\RecurringMessage;
1717
use Symfony\Component\Scheduler\Schedule;
1818
use Symfony\Component\Scheduler\ScheduleProviderInterface;
19+
use Symfony\Component\Scheduler\Trigger\StatefulTriggerInterface;
1920

2021
final class MessageGenerator implements MessageGeneratorInterface
2122
{
@@ -43,9 +44,10 @@ public function getMessages(): \Generator
4344
return;
4445
}
4546

47+
$startTime = $checkpoint->from();
4648
$lastTime = $checkpoint->time();
4749
$lastIndex = $checkpoint->index();
48-
$heap = $this->heap($lastTime);
50+
$heap = $this->heap($lastTime, $startTime);
4951

5052
while (!$heap->isEmpty() && $heap->top()[0] <= $now) {
5153
/** @var \DateTimeImmutable $time */
@@ -79,7 +81,7 @@ public function getMessages(): \Generator
7981
$checkpoint->release($now, $this->waitUntil);
8082
}
8183

82-
private function heap(\DateTimeImmutable $time): TriggerHeap
84+
private function heap(\DateTimeImmutable $time, \DateTimeImmutable $startTime): TriggerHeap
8385
{
8486
if (isset($this->triggerHeap) && $this->triggerHeap->time <= $time) {
8587
return $this->triggerHeap;
@@ -88,7 +90,13 @@ private function heap(\DateTimeImmutable $time): TriggerHeap
8890
$heap = new TriggerHeap($time);
8991

9092
foreach ($this->schedule()->getRecurringMessages() as $index => $recurringMessage) {
91-
if (!$nextTime = $recurringMessage->getTrigger()->getNextRunDate($time)) {
93+
$trigger = $recurringMessage->getTrigger();
94+
95+
if ($trigger instanceof StatefulTriggerInterface) {
96+
$trigger->continue($startTime);
97+
}
98+
99+
if (!$nextTime = $trigger->getNextRunDate($time)) {
92100
continue;
93101
}
94102

‎src/Symfony/Component/Scheduler/RecurringMessage.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Scheduler/RecurringMessage.php
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ private function __construct(
4040
* @see https://en.wikipedia.org/wiki/ISO_8601#Durations
4141
* @see https://php.net/datetime.formats.relative
4242
*/
43-
public static function every(string|int|\DateInterval $frequency, object $message, string|\DateTimeImmutable $from = new \DateTimeImmutable(), string|\DateTimeImmutable $until = new \DateTimeImmutable('3000-01-01')): self
43+
public static function every(string|int|\DateInterval $frequency, object $message, string|\DateTimeImmutable|null $from = null, string|\DateTimeImmutable $until = new \DateTimeImmutable('3000-01-01')): self
4444
{
4545
return new self(new PeriodicalTrigger($frequency, $from, $until), $message);
4646
}

‎src/Symfony/Component/Scheduler/Tests/Generator/CheckpointTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Scheduler/Tests/Generator/CheckpointTest.php
+14-10Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public function testWithStateInitStateOnFirstAcquiring()
4848
$this->assertTrue($checkpoint->acquire($now));
4949
$this->assertEquals($now, $checkpoint->time());
5050
$this->assertEquals(-1, $checkpoint->index());
51-
$this->assertEquals([$now, -1], $cache->get('cache', fn () => []));
51+
$this->assertEquals([$now, -1, $now], $cache->get('cache', fn () => []));
5252
}
5353

5454
public function testWithStateLoadStateOnAcquiring()
@@ -58,10 +58,10 @@ public function testWithStateLoadStateOnAcquiring()
5858

5959
$cache->get('cache', fn () => [$now, 0], \INF);
6060

61-
$this->assertTrue($checkpoint->acquire($now->modify('1 min')));
61+
$this->assertTrue($checkpoint->acquire($startedAt = $now->modify('1 min')));
6262
$this->assertEquals($now, $checkpoint->time());
6363
$this->assertEquals(0, $checkpoint->index());
64-
$this->assertEquals([$now, 0], $cache->get('cache', fn () => []));
64+
$this->assertEquals([$now, 0, $startedAt], $cache->get('cache', fn () => []));
6565
}
6666

6767
public function testWithLockInitStateOnFirstAcquiring()
@@ -72,11 +72,12 @@ public function testWithLockInitStateOnFirstAcquiring()
7272

7373
$this->assertTrue($checkpoint->acquire($now));
7474
$this->assertEquals($now, $checkpoint->time());
75+
$this->assertEquals($now, $checkpoint->from());
7576
$this->assertEquals(-1, $checkpoint->index());
7677
$this->assertTrue($lock->isAcquired());
7778
}
7879

79-
public function testwithLockLoadStateOnAcquiring()
80+
public function testWithLockLoadStateOnAcquiring()
8081
{
8182
$lock = new Lock(new Key('lock'), new InMemoryStore());
8283
$checkpoint = new Checkpoint('dummy', $lock);
@@ -86,6 +87,7 @@ public function testwithLockLoadStateOnAcquiring()
8687

8788
$this->assertTrue($checkpoint->acquire($now->modify('1 min')));
8889
$this->assertEquals($now, $checkpoint->time());
90+
$this->assertEquals($now, $checkpoint->from());
8991
$this->assertEquals(0, $checkpoint->index());
9092
$this->assertTrue($lock->isAcquired());
9193
}
@@ -105,12 +107,13 @@ public function testWithCacheSave()
105107
{
106108
$checkpoint = new Checkpoint('cache', new NoLock(), $cache = new ArrayAdapter());
107109
$now = new \DateTimeImmutable('2020-02-20 20:20:20Z');
108-
$checkpoint->acquire($now->modify('-1 hour'));
110+
$checkpoint->acquire($startedAt = $now->modify('-1 hour'));
109111
$checkpoint->save($now, 3);
110112

111113
$this->assertSame($now, $checkpoint->time());
112114
$this->assertSame(3, $checkpoint->index());
113-
$this->assertEquals([$now, 3], $cache->get('cache', fn () => []));
115+
$this->assertSame($startedAt, $checkpoint->from());
116+
$this->assertEquals([$now, 3, $startedAt], $cache->get('cache', fn () => []));
114117
}
115118

116119
public function testWithLockSave()
@@ -119,11 +122,12 @@ public function testWithLockSave()
119122
$checkpoint = new Checkpoint('dummy', $lock);
120123
$now = new \DateTimeImmutable('2020-02-20 20:20:20Z');
121124

122-
$checkpoint->acquire($now->modify('-1 hour'));
125+
$checkpoint->acquire($startTime = $now->modify('-1 hour'));
123126
$checkpoint->save($now, 3);
124127

125128
$this->assertSame($now, $checkpoint->time());
126129
$this->assertSame(3, $checkpoint->index());
130+
$this->assertSame($startTime, $checkpoint->from());
127131
}
128132

129133
public function testWithLockAndCacheSave()
@@ -132,12 +136,12 @@ public function testWithLockAndCacheSave()
132136
$checkpoint = new Checkpoint('dummy', $lock, $cache = new ArrayAdapter());
133137
$now = new \DateTimeImmutable('2020-02-20 20:20:20Z');
134138

135-
$checkpoint->acquire($now->modify('-1 hour'));
139+
$checkpoint->acquire($startTime = $now->modify('-1 hour'));
136140
$checkpoint->save($now, 3);
137141

138142
$this->assertSame($now, $checkpoint->time());
139143
$this->assertSame(3, $checkpoint->index());
140-
$this->assertEquals([$now, 3], $cache->get('dummy', fn () => []));
144+
$this->assertEquals([$now, 3, $startTime], $cache->get('dummy', fn () => []));
141145
}
142146

143147
public function testWithCacheFullCycle()
@@ -161,7 +165,7 @@ public function testWithCacheFullCycle()
161165
$this->assertSame(3, $lastIndex);
162166
$this->assertEquals($now, $checkpoint->time());
163167
$this->assertSame(0, $checkpoint->index());
164-
$this->assertEquals([$now, 0], $cache->get('cache', fn () => []));
168+
$this->assertEquals([$now, 0, $now], $cache->get('cache', fn () => []));
165169
}
166170

167171
public function testWithLockResetStateAfterLockedAcquiring()

‎src/Symfony/Component/Scheduler/Tests/Trigger/PeriodicalTriggerTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Scheduler/Tests/Trigger/PeriodicalTriggerTest.php
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ public static function providerGetNextRunDateAgain(): iterable
184184
yield [
185185
$trigger,
186186
new \DateTimeImmutable('2020-02-20T01:59:00+02:00'),
187-
new \DateTimeImmutable('2020-02-20T02:09:00+02:00'),
187+
new \DateTimeImmutable('2020-02-20T02:00:00+02:00'),
188188
];
189189
yield [
190190
$trigger,

‎src/Symfony/Component/Scheduler/Trigger/AbstractDecoratedTrigger.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Scheduler/Trigger/AbstractDecoratedTrigger.php
+8-1Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,19 @@
1414
/**
1515
* @author Kevin Bond <kevinbond@gmail.com>
1616
*/
17-
abstract class AbstractDecoratedTrigger implements TriggerInterface
17+
abstract class AbstractDecoratedTrigger implements StatefulTriggerInterface
1818
{
1919
public function __construct(private TriggerInterface $inner)
2020
{
2121
}
2222

23+
public function continue(\DateTimeImmutable $startedAt): void
24+
{
25+
if ($this->inner instanceof StatefulTriggerInterface) {
26+
$this->inner->continue($startedAt);
27+
}
28+
}
29+
2330
final public function inner(): TriggerInterface
2431
{
2532
$inner = $this->inner;

‎src/Symfony/Component/Scheduler/Trigger/PeriodicalTrigger.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Scheduler/Trigger/PeriodicalTrigger.php
+16-7Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,18 @@
1313

1414
use Symfony\Component\Scheduler\Exception\InvalidArgumentException;
1515

16-
class PeriodicalTrigger implements TriggerInterface
16+
class PeriodicalTrigger implements StatefulTriggerInterface
1717
{
1818
private float $intervalInSeconds = 0.0;
19-
private \DateTimeImmutable $from;
19+
private ?\DateTimeImmutable $from;
2020
private \DateTimeImmutable $until;
2121
private \DatePeriod $period;
2222
private string $description;
23+
private string|int|float|\DateInterval $interval;
2324

2425
public function __construct(
2526
string|int|float|\DateInterval $interval,
26-
string|\DateTimeImmutable $from = new \DateTimeImmutable(),
27+
string|\DateTimeImmutable|null $from = null,
2728
string|\DateTimeImmutable $until = new \DateTimeImmutable('3000-01-01'),
2829
) {
2930
$this->from = \is_string($from) ? new \DateTimeImmutable($from) : $from;
@@ -70,7 +71,7 @@ public function __construct(
7071
$this->description = sprintf('every %s seconds', $this->intervalInSeconds);
7172
}
7273
} else {
73-
$this->period = new \DatePeriod($this->from, $i, $this->until);
74+
$this->interval = $i;
7475
}
7576
} catch (\Exception $e) {
7677
throw new InvalidArgumentException(sprintf('Invalid interval "%s": ', $interval instanceof \DateInterval ? 'instance of \DateInterval' : $interval).$e->getMessage(), 0, $e);
@@ -82,15 +83,22 @@ public function __toString(): string
8283
return $this->description;
8384
}
8485

86+
public function continue(\DateTimeImmutable $startedAt): void
87+
{
88+
$this->from ??= $startedAt;
89+
}
90+
8591
public function getNextRunDate(\DateTimeImmutable $run): ?\DateTimeImmutable
8692
{
93+
$this->from ??= $run;
94+
8795
if ($this->intervalInSeconds) {
8896
if ($this->until <= $run) {
8997
return null;
9098
}
9199

92-
$fromDate = min($this->from, $run);
93-
$from = $fromDate->format('U.u');
100+
$fromDate = $this->from;
101+
$from = (float) $fromDate->format('U.u');
94102
$delta = $run->format('U.u') - $from;
95103
$recurrencesPassed = floor($delta / $this->intervalInSeconds);
96104
$nextRunTimestamp = sprintf('%.6F', ($recurrencesPassed + 1) * $this->intervalInSeconds + $from);
@@ -103,6 +111,7 @@ public function getNextRunDate(\DateTimeImmutable $run): ?\DateTimeImmutable
103111
return $this->until > $nextRun ? $nextRun : null;
104112
}
105113

114+
$this->period ??= new \DatePeriod($this->from, $this->interval, $this->until);
106115
$iterator = $this->period->getIterator();
107116
while ($run >= $next = $iterator->current()) {
108117
$iterator->next();
@@ -130,6 +139,6 @@ private function canBeConvertedToSeconds(\DateInterval $interval): bool
130139

131140
private function calcInterval(\DateInterval $interval): float
132141
{
133-
return $this->from->setTimestamp(0)->add($interval)->format('U.u');
142+
return (float) (new \DateTimeImmutable('@0'))->add($interval)->format('U.u');
134143
}
135144
}
+8Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
<?php
2+
3+
namespace Symfony\Component\Scheduler\Trigger;
4+
5+
interface StatefulTriggerInterface extends TriggerInterface
6+
{
7+
public function continue(\DateTimeImmutable $startedAt): void;
8+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.