Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit ec4b5c7

Browse filesBrowse files
committed
feature #59601 [Messenger] Add keepalive support (silasjoisten)
This PR was squashed before being merged into the 7.3 branch. Discussion ---------- [Messenger] Add keepalive support | Q | A | ------------- | --- | Branch? | 7.3 | Bug fix? | no | New feature? | yes | Deprecations? | no | Issues | - | License | MIT This Pull Request adds keepalive support to the Doctrine Messenger transport by implementing the keepalive method in the Connection class. This enhancement aligns Doctrine transport with the existing **keepalive** functionality already supported by other Messenger transports, such as Redis and Beanstalkd. The **keepalive** principle was introduced in Symfony 7.2 to address issues where long-running message processing could lead to premature message timeouts and redelivery. By keeping the message “alive” on the transport layer, the message remains marked as being processed until explicitly acknowledged. Other transports like Redis and Beanstalkd have already implemented this feature. This PR extends the functionality to the Doctrine transport. Commits ------- 970c07b [Messenger] Add keepalive support
2 parents 1f83bf1 + 970c07b commit ec4b5c7
Copy full SHA for ec4b5c7

File tree

Expand file treeCollapse file tree

8 files changed

+172
-3
lines changed
Filter options
Expand file treeCollapse file tree

8 files changed

+172
-3
lines changed

‎src/Symfony/Component/Messenger/Bridge/Doctrine/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Doctrine/CHANGELOG.md
+5Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
CHANGELOG
22
=========
33

4+
7.3
5+
---
6+
7+
* Add "keepalive" support
8+
49
7.1
510
---
611

‎src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/ConnectionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/ConnectionTest.php
+91Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,97 @@ public function testSendLastInsertIdReturnsInteger()
299299
self::assertSame('1', $id);
300300
}
301301

302+
public function testKeepalive()
303+
{
304+
$queryBuilder = $this->getQueryBuilderMock();
305+
$driverConnection = $this->getDBALConnectionMock();
306+
307+
$connection = new Connection(['redeliver_timeout' => 30, 'table_name' => 'messenger_messages'], $driverConnection);
308+
309+
$queryBuilder->expects($this->once())
310+
->method('update')
311+
->with('messenger_messages')
312+
->willReturnSelf();
313+
314+
$queryBuilder->expects($this->once())
315+
->method('set')
316+
->with('delivered_at', '?')
317+
->willReturnSelf();
318+
319+
$queryBuilder->expects($this->once())
320+
->method('where')
321+
->with('id = ?')
322+
->willReturnSelf();
323+
324+
$driverConnection->expects($this->once())
325+
->method('beginTransaction');
326+
327+
$driverConnection->expects($this->once())
328+
->method('createQueryBuilder')
329+
->willReturn($queryBuilder);
330+
331+
$driverConnection->expects($this->once())
332+
->method('commit');
333+
334+
$connection->keepalive('1');
335+
}
336+
337+
public function testKeepaliveRollback()
338+
{
339+
$queryBuilder = $this->getQueryBuilderMock();
340+
$driverConnection = $this->getDBALConnectionMock();
341+
342+
$connection = new Connection(['redeliver_timeout' => 30, 'table_name' => 'messenger_messages'], $driverConnection);
343+
344+
$queryBuilder->expects($this->once())
345+
->method('update')
346+
->with('messenger_messages')
347+
->willReturnSelf();
348+
349+
$queryBuilder->expects($this->once())
350+
->method('set')
351+
->with('delivered_at', '?')
352+
->willReturnSelf();
353+
354+
$queryBuilder->expects($this->once())
355+
->method('where')
356+
->with('id = ?')
357+
->willReturnSelf();
358+
359+
$driverConnection->expects($this->once())
360+
->method('beginTransaction');
361+
362+
$driverConnection->expects($this->once())
363+
->method('createQueryBuilder')
364+
->willReturn($queryBuilder);
365+
366+
$driverConnection->expects($this->once())
367+
->method('executeStatement')
368+
->willThrowException($this->createMock(DBALException::class));
369+
370+
$driverConnection->expects($this->never())
371+
->method('commit');
372+
373+
$driverConnection->expects($this->once())
374+
->method('rollBack');
375+
376+
$this->expectException(TransportException::class);
377+
378+
$connection->keepalive('1');
379+
}
380+
381+
public function testKeepaliveThrowsExceptionWhenRedeliverTimeoutIsLessThenInterval()
382+
{
383+
$driverConnection = $this->getDBALConnectionMock();
384+
385+
$connection = new Connection(['redeliver_timeout' => 30], $driverConnection);
386+
387+
$this->expectException(TransportException::class);
388+
$this->expectExceptionMessage('Doctrine redeliver_timeout (30s) cannot be smaller than the keepalive interval (60s).');
389+
390+
$connection->keepalive('1', 60);
391+
}
392+
302393
private function getDBALConnectionMock()
303394
{
304395
$driverConnection = $this->createMock(DBALConnection::class);

‎src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineReceiverTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineReceiverTest.php
+16Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,22 @@ public function testRejectThrowsException()
353353
$receiver->reject($envelope);
354354
}
355355

356+
public function testKeepalive()
357+
{
358+
$serializer = $this->createSerializer();
359+
$connection = $this->createMock(Connection::class);
360+
361+
$envelope = new Envelope(new \stdClass(), [new DoctrineReceivedStamp('1')]);
362+
$receiver = new DoctrineReceiver($connection, $serializer);
363+
364+
$connection
365+
->expects($this->once())
366+
->method('keepalive')
367+
->with('1');
368+
369+
$receiver->keepalive($envelope);
370+
}
371+
356372
private function createDoctrineEnvelope(): array
357373
{
358374
return [

‎src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineTransportTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/DoctrineTransportTest.php
+17Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
use PHPUnit\Framework\TestCase;
1717
use Symfony\Component\Messenger\Bridge\Doctrine\Tests\Fixtures\DummyMessage;
1818
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\Connection;
19+
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineReceivedStamp;
1920
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\DoctrineTransport;
2021
use Symfony\Component\Messenger\Envelope;
2122
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -69,6 +70,22 @@ public function testConfigureSchema()
6970
$transport->configureSchema($schema, $dbalConnection, static fn () => true);
7071
}
7172

73+
public function testKeepalive()
74+
{
75+
$transport = $this->getTransport(
76+
null,
77+
$connection = $this->createMock(Connection::class)
78+
);
79+
80+
$envelope = new Envelope(new \stdClass(), [new DoctrineReceivedStamp('1')]);
81+
82+
$connection->expects($this->once())
83+
->method('keepalive')
84+
->with('1');
85+
86+
$transport->keepalive($envelope);
87+
}
88+
7289
private function getTransport(?SerializerInterface $serializer = null, ?Connection $connection = null): DoctrineTransport
7390
{
7491
$serializer ??= $this->createMock(SerializerInterface::class);

‎src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/Connection.php
+28Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,34 @@ public function reject(string $id): bool
286286
}
287287
}
288288

289+
public function keepalive(string $id, ?int $seconds = null): void
290+
{
291+
// Check if the redeliver timeout is smaller than the keepalive interval
292+
if (null !== $seconds && $this->configuration['redeliver_timeout'] < $seconds) {
293+
throw new TransportException(\sprintf('Doctrine redeliver_timeout (%ds) cannot be smaller than the keepalive interval (%ds).', $this->configuration['redeliver_timeout'], $seconds));
294+
}
295+
296+
$this->driverConnection->beginTransaction();
297+
try {
298+
$queryBuilder = $this->driverConnection->createQueryBuilder()
299+
->update($this->configuration['table_name'])
300+
->set('delivered_at', '?')
301+
->where('id = ?');
302+
$now = new \DateTimeImmutable('UTC');
303+
$this->executeStatement($queryBuilder->getSQL(), [
304+
$now,
305+
$id,
306+
], [
307+
Types::DATETIME_IMMUTABLE,
308+
]);
309+
310+
$this->driverConnection->commit();
311+
} catch (\Throwable $e) {
312+
$this->driverConnection->rollBack();
313+
throw new TransportException($e->getMessage(), 0, $e);
314+
}
315+
}
316+
289317
public function setup(): void
290318
{
291319
$configuration = $this->driverConnection->getConfiguration();

‎src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/DoctrineReceiver.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/DoctrineReceiver.php
+7-1Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
1919
use Symfony\Component\Messenger\Exception\TransportException;
2020
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
21+
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
2122
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
2223
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
2324
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
@@ -26,7 +27,7 @@
2627
/**
2728
* @author Vincent Touzet <vincent.touzet@gmail.com>
2829
*/
29-
class DoctrineReceiver implements ListableReceiverInterface, MessageCountAwareInterface
30+
class DoctrineReceiver implements ListableReceiverInterface, MessageCountAwareInterface, KeepaliveReceiverInterface
3031
{
3132
private const MAX_RETRIES = 3;
3233
private int $retryingSafetyCounter = 0;
@@ -72,6 +73,11 @@ public function ack(Envelope $envelope): void
7273
});
7374
}
7475

76+
public function keepalive(Envelope $envelope, ?int $seconds = null): void
77+
{
78+
$this->connection->keepalive($this->findDoctrineReceivedStamp($envelope)->getId(), $seconds);
79+
}
80+
7581
public function reject(Envelope $envelope): void
7682
{
7783
$this->withRetryableExceptionRetry(function () use ($envelope) {

‎src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/DoctrineTransport.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/DoctrineTransport.php
+7-1Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use Doctrine\DBAL\Schema\Schema;
1616
use Doctrine\DBAL\Schema\Table;
1717
use Symfony\Component\Messenger\Envelope;
18+
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
1819
use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface;
1920
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
2021
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -24,7 +25,7 @@
2425
/**
2526
* @author Vincent Touzet <vincent.touzet@gmail.com>
2627
*/
27-
class DoctrineTransport implements TransportInterface, SetupableTransportInterface, MessageCountAwareInterface, ListableReceiverInterface
28+
class DoctrineTransport implements TransportInterface, SetupableTransportInterface, MessageCountAwareInterface, ListableReceiverInterface, KeepaliveReceiverInterface
2829
{
2930
private DoctrineReceiver $receiver;
3031
private DoctrineSender $sender;
@@ -50,6 +51,11 @@ public function reject(Envelope $envelope): void
5051
$this->getReceiver()->reject($envelope);
5152
}
5253

54+
public function keepalive(Envelope $envelope, ?int $seconds = null): void
55+
{
56+
$this->getReceiver()->keepalive($envelope, $seconds);
57+
}
58+
5359
public function getMessageCount(): int
5460
{
5561
return $this->getReceiver()->getMessageCount();

‎src/Symfony/Component/Messenger/Bridge/Doctrine/composer.json

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Doctrine/composer.json
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
"require": {
1919
"php": ">=8.2",
2020
"doctrine/dbal": "^3.6|^4",
21-
"symfony/messenger": "^6.4|^7.0",
21+
"symfony/messenger": "^7.2",
2222
"symfony/service-contracts": "^2.5|^3"
2323
},
2424
"require-dev": {

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.