Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

[Scheduler] Fix stateful scheduler #51651

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Fix stateful scheduler
  • Loading branch information
valtzu committed Sep 15, 2023
commit 2d5856b6bee3f80444b7db49bfcdb981ee6edbb8
1 change: 1 addition & 0 deletions 1 src/Symfony/Component/Scheduler/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ CHANGELOG
* Add `AbstractTriggerDecorator`
* Make `ScheduledStamp` "send-able"
* Add `ScheduledStamp` to `RedispatchMessage`
* Add `from()` to `CheckpointInterface`

6.3
---
Expand Down
16 changes: 13 additions & 3 deletions 16 src/Symfony/Component/Scheduler/Generator/Checkpoint.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

final class Checkpoint implements CheckpointInterface
{
private \DateTimeImmutable $from;
private \DateTimeImmutable $time;
private int $index = -1;
private bool $reset = false;
Expand All @@ -41,14 +42,22 @@ public function acquire(\DateTimeImmutable $now): bool
$this->save($now, -1);
}

$this->time ??= $now;
if ($this->cache) {
$this->save(...$this->cache->get($this->name, fn () => [$now, -1]));
[$this->time, $this->index, $this->from] = $this->cache->get($this->name, fn () => [$now, -1, $now]) + [2 => $now];
$this->save($this->time, $this->index);
}

$this->time ??= $now;
$this->from ??= $now;

return true;
}

public function from(): \DateTimeImmutable
{
return $this->from;
}

public function time(): \DateTimeImmutable
{
return $this->time;
Expand All @@ -63,7 +72,8 @@ public function save(\DateTimeImmutable $time, int $index): void
{
$this->time = $time;
$this->index = $index;
$this->cache?->get($this->name, fn () => [$time, $index], \INF);
$this->from ??= $time;
$this->cache?->get($this->name, fn () => [$time, $index, $this->from], \INF);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ interface CheckpointInterface
{
public function acquire(\DateTimeImmutable $now): bool;

public function from(): \DateTimeImmutable;
fabpot marked this conversation as resolved.
Show resolved Hide resolved

public function time(): \DateTimeImmutable;

public function index(): int;
Expand Down
14 changes: 11 additions & 3 deletions 14 src/Symfony/Component/Scheduler/Generator/MessageGenerator.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use Symfony\Component\Scheduler\RecurringMessage;
use Symfony\Component\Scheduler\Schedule;
use Symfony\Component\Scheduler\ScheduleProviderInterface;
use Symfony\Component\Scheduler\Trigger\StatefulTriggerInterface;

final class MessageGenerator implements MessageGeneratorInterface
{
Expand Down Expand Up @@ -43,9 +44,10 @@ public function getMessages(): \Generator
return;
}

$startTime = $checkpoint->from();
$lastTime = $checkpoint->time();
$lastIndex = $checkpoint->index();
$heap = $this->heap($lastTime);
$heap = $this->heap($lastTime, $startTime);

while (!$heap->isEmpty() && $heap->top()[0] <= $now) {
/** @var \DateTimeImmutable $time */
Expand Down Expand Up @@ -79,7 +81,7 @@ public function getMessages(): \Generator
$checkpoint->release($now, $this->waitUntil);
}

private function heap(\DateTimeImmutable $time): TriggerHeap
private function heap(\DateTimeImmutable $time, \DateTimeImmutable $startTime): TriggerHeap
{
if (isset($this->triggerHeap) && $this->triggerHeap->time <= $time) {
return $this->triggerHeap;
Expand All @@ -88,7 +90,13 @@ private function heap(\DateTimeImmutable $time): TriggerHeap
$heap = new TriggerHeap($time);

foreach ($this->schedule()->getRecurringMessages() as $index => $recurringMessage) {
if (!$nextTime = $recurringMessage->getTrigger()->getNextRunDate($time)) {
$trigger = $recurringMessage->getTrigger();

if ($trigger instanceof StatefulTriggerInterface) {
$trigger->continue($startTime);
}

if (!$nextTime = $trigger->getNextRunDate($time)) {
continue;
}

Expand Down
2 changes: 1 addition & 1 deletion 2 src/Symfony/Component/Scheduler/RecurringMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ private function __construct(
* @see https://en.wikipedia.org/wiki/ISO_8601#Durations
* @see https://php.net/datetime.formats.relative
*/
public static function every(string|int|\DateInterval $frequency, object $message, string|\DateTimeImmutable $from = new \DateTimeImmutable(), string|\DateTimeImmutable $until = new \DateTimeImmutable('3000-01-01')): self
public static function every(string|int|\DateInterval $frequency, object $message, string|\DateTimeImmutable|null $from = null, string|\DateTimeImmutable $until = new \DateTimeImmutable('3000-01-01')): self
{
return new self(new PeriodicalTrigger($frequency, $from, $until), $message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public function testWithStateInitStateOnFirstAcquiring()
$this->assertTrue($checkpoint->acquire($now));
$this->assertEquals($now, $checkpoint->time());
$this->assertEquals(-1, $checkpoint->index());
$this->assertEquals([$now, -1], $cache->get('cache', fn () => []));
$this->assertEquals([$now, -1, $now], $cache->get('cache', fn () => []));
}

public function testWithStateLoadStateOnAcquiring()
Expand All @@ -58,10 +58,10 @@ public function testWithStateLoadStateOnAcquiring()

$cache->get('cache', fn () => [$now, 0], \INF);

$this->assertTrue($checkpoint->acquire($now->modify('1 min')));
$this->assertTrue($checkpoint->acquire($startedAt = $now->modify('1 min')));
$this->assertEquals($now, $checkpoint->time());
$this->assertEquals(0, $checkpoint->index());
$this->assertEquals([$now, 0], $cache->get('cache', fn () => []));
$this->assertEquals([$now, 0, $startedAt], $cache->get('cache', fn () => []));
}

public function testWithLockInitStateOnFirstAcquiring()
Expand All @@ -72,11 +72,12 @@ public function testWithLockInitStateOnFirstAcquiring()

$this->assertTrue($checkpoint->acquire($now));
$this->assertEquals($now, $checkpoint->time());
$this->assertEquals($now, $checkpoint->from());
$this->assertEquals(-1, $checkpoint->index());
$this->assertTrue($lock->isAcquired());
}

public function testwithLockLoadStateOnAcquiring()
public function testWithLockLoadStateOnAcquiring()
{
$lock = new Lock(new Key('lock'), new InMemoryStore());
$checkpoint = new Checkpoint('dummy', $lock);
Expand All @@ -86,6 +87,7 @@ public function testwithLockLoadStateOnAcquiring()

$this->assertTrue($checkpoint->acquire($now->modify('1 min')));
$this->assertEquals($now, $checkpoint->time());
$this->assertEquals($now, $checkpoint->from());
$this->assertEquals(0, $checkpoint->index());
$this->assertTrue($lock->isAcquired());
}
Expand All @@ -105,12 +107,13 @@ public function testWithCacheSave()
{
$checkpoint = new Checkpoint('cache', new NoLock(), $cache = new ArrayAdapter());
$now = new \DateTimeImmutable('2020-02-20 20:20:20Z');
$checkpoint->acquire($now->modify('-1 hour'));
$checkpoint->acquire($startedAt = $now->modify('-1 hour'));
$checkpoint->save($now, 3);

$this->assertSame($now, $checkpoint->time());
$this->assertSame(3, $checkpoint->index());
$this->assertEquals([$now, 3], $cache->get('cache', fn () => []));
$this->assertSame($startedAt, $checkpoint->from());
$this->assertEquals([$now, 3, $startedAt], $cache->get('cache', fn () => []));
}

public function testWithLockSave()
Expand All @@ -119,11 +122,12 @@ public function testWithLockSave()
$checkpoint = new Checkpoint('dummy', $lock);
$now = new \DateTimeImmutable('2020-02-20 20:20:20Z');

$checkpoint->acquire($now->modify('-1 hour'));
$checkpoint->acquire($startTime = $now->modify('-1 hour'));
$checkpoint->save($now, 3);

$this->assertSame($now, $checkpoint->time());
$this->assertSame(3, $checkpoint->index());
$this->assertSame($startTime, $checkpoint->from());
}

public function testWithLockAndCacheSave()
Expand All @@ -132,12 +136,12 @@ public function testWithLockAndCacheSave()
$checkpoint = new Checkpoint('dummy', $lock, $cache = new ArrayAdapter());
$now = new \DateTimeImmutable('2020-02-20 20:20:20Z');

$checkpoint->acquire($now->modify('-1 hour'));
$checkpoint->acquire($startTime = $now->modify('-1 hour'));
$checkpoint->save($now, 3);

$this->assertSame($now, $checkpoint->time());
$this->assertSame(3, $checkpoint->index());
$this->assertEquals([$now, 3], $cache->get('dummy', fn () => []));
$this->assertEquals([$now, 3, $startTime], $cache->get('dummy', fn () => []));
}

public function testWithCacheFullCycle()
Expand All @@ -161,7 +165,7 @@ public function testWithCacheFullCycle()
$this->assertSame(3, $lastIndex);
$this->assertEquals($now, $checkpoint->time());
$this->assertSame(0, $checkpoint->index());
$this->assertEquals([$now, 0], $cache->get('cache', fn () => []));
$this->assertEquals([$now, 0, $now], $cache->get('cache', fn () => []));
}

public function testWithLockResetStateAfterLockedAcquiring()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public static function providerGetNextRunDateAgain(): iterable
yield [
$trigger,
new \DateTimeImmutable('2020-02-20T01:59:00+02:00'),
new \DateTimeImmutable('2020-02-20T02:09:00+02:00'),
new \DateTimeImmutable('2020-02-20T02:00:00+02:00'),
];
yield [
$trigger,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,19 @@
/**
* @author Kevin Bond <kevinbond@gmail.com>
*/
abstract class AbstractDecoratedTrigger implements TriggerInterface
abstract class AbstractDecoratedTrigger implements StatefulTriggerInterface
{
public function __construct(private TriggerInterface $inner)
{
}

public function continue(\DateTimeImmutable $startedAt): void
{
if ($this->inner instanceof StatefulTriggerInterface) {
$this->inner->continue($startedAt);
}
}

final public function inner(): TriggerInterface
{
$inner = $this->inner;
Expand Down
23 changes: 16 additions & 7 deletions 23 src/Symfony/Component/Scheduler/Trigger/PeriodicalTrigger.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,18 @@

use Symfony\Component\Scheduler\Exception\InvalidArgumentException;

class PeriodicalTrigger implements TriggerInterface
class PeriodicalTrigger implements StatefulTriggerInterface
{
private float $intervalInSeconds = 0.0;
private \DateTimeImmutable $from;
private ?\DateTimeImmutable $from;
private \DateTimeImmutable $until;
private \DatePeriod $period;
private string $description;
private string|int|float|\DateInterval $interval;

public function __construct(
string|int|float|\DateInterval $interval,
string|\DateTimeImmutable $from = new \DateTimeImmutable(),
string|\DateTimeImmutable|null $from = null,
string|\DateTimeImmutable $until = new \DateTimeImmutable('3000-01-01'),
) {
$this->from = \is_string($from) ? new \DateTimeImmutable($from) : $from;
Expand Down Expand Up @@ -70,7 +71,7 @@ public function __construct(
$this->description = sprintf('every %s seconds', $this->intervalInSeconds);
}
} else {
$this->period = new \DatePeriod($this->from, $i, $this->until);
$this->interval = $i;
}
} catch (\Exception $e) {
throw new InvalidArgumentException(sprintf('Invalid interval "%s": ', $interval instanceof \DateInterval ? 'instance of \DateInterval' : $interval).$e->getMessage(), 0, $e);
Expand All @@ -82,15 +83,22 @@ public function __toString(): string
return $this->description;
}

public function continue(\DateTimeImmutable $startedAt): void
{
$this->from ??= $startedAt;
}

public function getNextRunDate(\DateTimeImmutable $run): ?\DateTimeImmutable
{
$this->from ??= $run;

if ($this->intervalInSeconds) {
if ($this->until <= $run) {
return null;
}

$fromDate = min($this->from, $run);
$from = $fromDate->format('U.u');
$fromDate = $this->from;
$from = (float) $fromDate->format('U.u');
$delta = $run->format('U.u') - $from;
$recurrencesPassed = floor($delta / $this->intervalInSeconds);
$nextRunTimestamp = sprintf('%.6F', ($recurrencesPassed + 1) * $this->intervalInSeconds + $from);
Expand All @@ -103,6 +111,7 @@ public function getNextRunDate(\DateTimeImmutable $run): ?\DateTimeImmutable
return $this->until > $nextRun ? $nextRun : null;
}

$this->period ??= new \DatePeriod($this->from, $this->interval, $this->until);
$iterator = $this->period->getIterator();
while ($run >= $next = $iterator->current()) {
$iterator->next();
Expand Down Expand Up @@ -130,6 +139,6 @@ private function canBeConvertedToSeconds(\DateInterval $interval): bool

private function calcInterval(\DateInterval $interval): float
{
return $this->from->setTimestamp(0)->add($interval)->format('U.u');
return (float) (new \DateTimeImmutable('@0'))->add($interval)->format('U.u');
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<?php

namespace Symfony\Component\Scheduler\Trigger;

interface StatefulTriggerInterface extends TriggerInterface
{
public function continue(\DateTimeImmutable $startedAt): void;
}
Morty Proxy This is a proxified and sanitized view of the page, visit original site.