Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 888e552

Browse filesBrowse files
bug #45888 [Messenger] Add mysql indexes back and work around deadlocks using soft-delete (nicolas-grekas)
This PR was merged into the 4.4 branch. Discussion ---------- [Messenger] Add mysql indexes back and work around deadlocks using soft-delete | Q | A | ------------- | --- | Branch? | 4.4 | Bug fix? | yes | New feature? | no | Deprecations? | no | Tickets | Fix #42868 | License | MIT | Doc PR | - #42345 removed some indexes because they create too many deadlocks when running many concurrent consumers. Yet, as reported in the linked issue, those indexes are useful when processing large queues (typically the failed messages queue). #45007 is proposing to add an option to force the indexes back, but I don't like it because it requires ppl to learn about the issue. I looked for a more seamless solution and here is my proposal. Instead of possibly triggering the deadlock during `ack()`/`reject()`, I propose to use a soft-delete there, and do the real delete in `get()`. This makes ack/reject safe because they don't alter any indexes anymore (the delete was), and this moves deadlock exceptions to the same function that creates the locks. This allows the retry mechanism in `DoctrineReceiver` to recover from at most 3 consecutive deadlock exceptions. There can be more, and in this case, the consumer will stop. But this should be much less likely. (yes, I did create a reproducer to work on this issue ;) ) Commits ------- 12271a4 [Messenger] Add mysql indexes back and work around deadlocks using soft-delete
2 parents 5762970 + 12271a4 commit 888e552
Copy full SHA for 888e552

File tree

2 files changed

+20
-63
lines changed
Filter options

2 files changed

+20
-63
lines changed

‎src/Symfony/Component/Messenger/Tests/Transport/Doctrine/ConnectionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/Doctrine/ConnectionTest.php
-58Lines changed: 0 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;
1313

1414
use Doctrine\DBAL\Abstraction\Result as AbstractionResult;
15-
use Doctrine\DBAL\Configuration;
1615
use Doctrine\DBAL\Connection as DBALConnection;
1716
use Doctrine\DBAL\DBALException;
1817
use Doctrine\DBAL\Driver\Result as DriverResult;
@@ -24,11 +23,8 @@
2423
use Doctrine\DBAL\Query\QueryBuilder;
2524
use Doctrine\DBAL\Result;
2625
use Doctrine\DBAL\Schema\AbstractSchemaManager;
27-
use Doctrine\DBAL\Schema\Schema;
2826
use Doctrine\DBAL\Schema\SchemaConfig;
29-
use Doctrine\DBAL\Schema\TableDiff;
3027
use Doctrine\DBAL\Statement;
31-
use Doctrine\DBAL\Types\Types;
3228
use PHPUnit\Framework\TestCase;
3329
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
3430
use Symfony\Component\Messenger\Exception\TransportException;
@@ -410,58 +406,4 @@ public function providePlatformSql(): iterable
410406
'SELECT m.* FROM messenger_messages m WITH (UPDLOCK, ROWLOCK) WHERE (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) AND (m.queue_name = ?) ORDER BY available_at ASC OFFSET 0 ROWS FETCH NEXT 1 ROWS ONLY ',
411407
];
412408
}
413-
414-
/**
415-
* @dataProvider setupIndicesProvider
416-
*/
417-
public function testSetupIndices(string $platformClass, array $expectedIndices)
418-
{
419-
$driverConnection = $this->createMock(DBALConnection::class);
420-
$driverConnection->method('getConfiguration')->willReturn(new Configuration());
421-
422-
$schemaManager = $this->createMock(AbstractSchemaManager::class);
423-
$schema = new Schema();
424-
$expectedTable = $schema->createTable('messenger_messages');
425-
$expectedTable->addColumn('id', Types::BIGINT);
426-
$expectedTable->setPrimaryKey(['id']);
427-
// Make sure columns for indices exists so addIndex() will not throw
428-
foreach (array_unique(array_merge(...$expectedIndices)) as $columnName) {
429-
$expectedTable->addColumn($columnName, Types::STRING);
430-
}
431-
foreach ($expectedIndices as $indexColumns) {
432-
$expectedTable->addIndex($indexColumns);
433-
}
434-
$schemaManager->method('createSchema')->willReturn($schema);
435-
if (method_exists(DBALConnection::class, 'createSchemaManager')) {
436-
$driverConnection->method('createSchemaManager')->willReturn($schemaManager);
437-
} else {
438-
$driverConnection->method('getSchemaManager')->willReturn($schemaManager);
439-
}
440-
441-
$platformMock = $this->createMock($platformClass);
442-
$platformMock
443-
->expects(self::once())
444-
->method('getAlterTableSQL')
445-
->with(self::callback(static function (TableDiff $tableDiff): bool {
446-
return 0 === \count($tableDiff->addedIndexes) && 0 === \count($tableDiff->changedIndexes) && 0 === \count($tableDiff->removedIndexes);
447-
}))
448-
->willReturn([]);
449-
$driverConnection->method('getDatabasePlatform')->willReturn($platformMock);
450-
451-
$connection = new Connection([], $driverConnection);
452-
$connection->setup();
453-
}
454-
455-
public function setupIndicesProvider(): iterable
456-
{
457-
yield 'MySQL' => [
458-
MySQL57Platform::class,
459-
[['delivered_at']],
460-
];
461-
462-
yield 'Other platforms' => [
463-
AbstractPlatform::class,
464-
[['queue_name'], ['available_at'], ['delivered_at']],
465-
];
466-
}
467409
}

‎src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php
+20-5Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
use Doctrine\DBAL\Connection as DBALConnection;
1515
use Doctrine\DBAL\DBALException;
16+
use Doctrine\DBAL\Driver\Exception as DriverException;
1617
use Doctrine\DBAL\Driver\Result as DriverResult;
1718
use Doctrine\DBAL\Exception;
1819
use Doctrine\DBAL\Exception\TableNotFoundException;
@@ -157,6 +158,14 @@ public function send(string $body, array $headers, int $delay = 0): string
157158

158159
public function get(): ?array
159160
{
161+
if ($this->driverConnection->getDatabasePlatform() instanceof MySQLPlatform) {
162+
try {
163+
$this->driverConnection->delete($this->configuration['table_name'], ['delivered_at' => '9999-12-31']);
164+
} catch (DriverException $e) {
165+
// Ignore the exception
166+
}
167+
}
168+
160169
get:
161170
$this->driverConnection->beginTransaction();
162171
try {
@@ -224,6 +233,10 @@ public function get(): ?array
224233
public function ack(string $id): bool
225234
{
226235
try {
236+
if ($this->driverConnection->getDatabasePlatform() instanceof MySQLPlatform) {
237+
return $this->driverConnection->update($this->configuration['table_name'], ['delivered_at' => '9999-12-31'], ['id' => $id]) > 0;
238+
}
239+
227240
return $this->driverConnection->delete($this->configuration['table_name'], ['id' => $id]) > 0;
228241
} catch (DBALException|Exception $exception) {
229242
throw new TransportException($exception->getMessage(), 0, $exception);
@@ -233,6 +246,10 @@ public function ack(string $id): bool
233246
public function reject(string $id): bool
234247
{
235248
try {
249+
if ($this->driverConnection->getDatabasePlatform() instanceof MySQLPlatform) {
250+
return $this->driverConnection->update($this->configuration['table_name'], ['delivered_at' => '9999-12-31'], ['id' => $id]) > 0;
251+
}
252+
236253
return $this->driverConnection->delete($this->configuration['table_name'], ['id' => $id]) > 0;
237254
} catch (DBALException|Exception $exception) {
238255
throw new TransportException($exception->getMessage(), 0, $exception);
@@ -388,6 +405,7 @@ private function getSchema(): Schema
388405
$table->addColumn('headers', self::$useDeprecatedConstants ? Type::TEXT : Types::TEXT)
389406
->setNotnull(true);
390407
$table->addColumn('queue_name', self::$useDeprecatedConstants ? Type::STRING : Types::STRING)
408+
->setLength(190) // MySQL 5.6 only supports 191 characters on an indexed column in utf8mb4 mode
391409
->setNotnull(true);
392410
$table->addColumn('created_at', self::$useDeprecatedConstants ? Type::DATETIME : Types::DATETIME_MUTABLE)
393411
->setNotnull(true);
@@ -396,11 +414,8 @@ private function getSchema(): Schema
396414
$table->addColumn('delivered_at', self::$useDeprecatedConstants ? Type::DATETIME : Types::DATETIME_MUTABLE)
397415
->setNotnull(false);
398416
$table->setPrimaryKey(['id']);
399-
// No indices on queue_name and available_at on MySQL to prevent deadlock issues when running multiple consumers.
400-
if (!$this->driverConnection->getDatabasePlatform() instanceof MySQLPlatform) {
401-
$table->addIndex(['queue_name']);
402-
$table->addIndex(['available_at']);
403-
}
417+
$table->addIndex(['queue_name']);
418+
$table->addIndex(['available_at']);
404419
$table->addIndex(['delivered_at']);
405420

406421
return $schema;

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.