Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 7080940

Browse filesBrowse files
committed
bug #42345 [Messenger] Remove indices in messenger table on MySQL to prevent deadlocks while removing messages when running multiple consumers (jeroennoten)
This PR was merged into the 4.4 branch. Discussion ---------- [Messenger] Remove indices in messenger table on MySQL to prevent deadlocks while removing messages when running multiple consumers SELECT ... FOR UPDATE locks rows but also relevant indices. Since locking rows and indices is not one atomic operation, this might cause deadlocks when running multiple workers. Removing indices on queue_name and available_at resolves this problem. | Q | A | ------------- | --- | Branch? | 4.4 | Bug fix? | yes | New feature? | no | Deprecations? | no | Tickets | Fix #41541 #39041 | License | MIT Using Doctrine transport with multiple consumers occasionally results in MySQL deadlocks while removing a message from the messages database table. This can be reproduced consistently by setting up a default `async` queue with the Doctrine transport and creating an empty `TestMessage` and `TestMessageHandler`. Create a command that dispatches 10000 of these messages in a for loop en start 4 message consumers. After a while, several consumers report a deadlock: ``` In Connection.php line 227: [Symfony\Component\Messenger\Exception\TransportException] An exception occurred while executing 'DELETE FROM messenger_messages WHERE id = ?' with params ["32903"]: SQLSTATE[40001]: Serialization failure: 1213 Deadlock found when trying to get lock; try restarting transaction Exception trace: at /var/www/vendor/symfony/messenger/Transport/Doctrine/Connection.php:227 Symfony\Component\Messenger\Transport\Doctrine\Connection->ack() at /var/www/vendor/symfony/messenger/Transport/Doctrine/DoctrineReceiver.php:79 Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceiver->ack() at /var/www/vendor/symfony/messenger/Transport/Doctrine/DoctrineTransport.php:50 Symfony\Component\Messenger\Transport\Doctrine\DoctrineTransport->ack() at /var/www/vendor/symfony/messenger/Worker.php:150 Symfony\Component\Messenger\Worker->handleMessage() at /var/www/vendor/symfony/messenger/Worker.php:81 Symfony\Component\Messenger\Worker->run() at /var/www/vendor/symfony/messenger/Command/ConsumeMessagesCommand.php:202 Symfony\Component\Messenger\Command\ConsumeMessagesCommand->execute() at /var/www/vendor/symfony/console/Command/Command.php:255 Symfony\Component\Console\Command\Command->run() at /var/www/vendor/symfony/console/Application.php:1027 Symfony\Component\Console\Application->doRunCommand() at /var/www/vendor/symfony/framework-bundle/Console/Application.php:97 Symfony\Bundle\FrameworkBundle\Console\Application->doRunCommand() at /var/www/vendor/symfony/console/Application.php:273 Symfony\Component\Console\Application->doRun() at /var/www/vendor/symfony/framework-bundle/Console/Application.php:83 Symfony\Bundle\FrameworkBundle\Console\Application->doRun() at /var/www/vendor/symfony/console/Application.php:149 Symfony\Component\Console\Application->run() at /var/www/bin/console:42 ``` A similar problem with Laravel's queue worker (and a solution) is reported here: laravel/framework#31660 The solution is to remove indices on the `queue_name` and `available_at` columns. After removing these indices, I could not reproduce the issue anymore. Also, I did not notice any performance degradations. Commits ------- 8c3c0a3 Remove indices in messenger table on MySQL to prevent deadlocks while removing messages when running multiple consumers
2 parents f4df205 + 8c3c0a3 commit 7080940
Copy full SHA for 7080940

File tree

2 files changed

+60
-3
lines changed
Filter options

2 files changed

+60
-3
lines changed

‎src/Symfony/Component/Messenger/Tests/Transport/Doctrine/ConnectionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/Doctrine/ConnectionTest.php
+54Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;
1313

1414
use Doctrine\DBAL\Abstraction\Result as AbstractionResult;
15+
use Doctrine\DBAL\Configuration;
1516
use Doctrine\DBAL\Connection as DBALConnection;
1617
use Doctrine\DBAL\DBALException;
1718
use Doctrine\DBAL\Driver\Result as DriverResult;
@@ -23,8 +24,11 @@
2324
use Doctrine\DBAL\Query\QueryBuilder;
2425
use Doctrine\DBAL\Result;
2526
use Doctrine\DBAL\Schema\AbstractSchemaManager;
27+
use Doctrine\DBAL\Schema\Schema;
2628
use Doctrine\DBAL\Schema\SchemaConfig;
29+
use Doctrine\DBAL\Schema\TableDiff;
2730
use Doctrine\DBAL\Statement;
31+
use Doctrine\DBAL\Types\Types;
2832
use PHPUnit\Framework\TestCase;
2933
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
3034
use Symfony\Component\Messenger\Exception\TransportException;
@@ -402,4 +406,54 @@ public function providePlatformSql(): iterable
402406
'SELECT m.* FROM messenger_messages m WITH (UPDLOCK, ROWLOCK) WHERE (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) AND (m.queue_name = ?) ORDER BY available_at ASC OFFSET 0 ROWS FETCH NEXT 1 ROWS ONLY ',
403407
];
404408
}
409+
410+
/**
411+
* @dataProvider setupIndicesProvider
412+
*/
413+
public function testSetupIndices(string $platformClass, array $expectedIndices)
414+
{
415+
$driverConnection = $this->createMock(DBALConnection::class);
416+
$driverConnection->method('getConfiguration')->willReturn(new Configuration());
417+
418+
$schemaManager = $this->createMock(AbstractSchemaManager::class);
419+
$schema = new Schema();
420+
$expectedTable = $schema->createTable('messenger_messages');
421+
$expectedTable->addColumn('id', Types::BIGINT);
422+
$expectedTable->setPrimaryKey(['id']);
423+
// Make sure columns for indices exists so addIndex() will not throw
424+
foreach (array_unique(array_merge(...$expectedIndices)) as $columnName) {
425+
$expectedTable->addColumn($columnName, Types::STRING);
426+
}
427+
foreach ($expectedIndices as $indexColumns) {
428+
$expectedTable->addIndex($indexColumns);
429+
}
430+
$schemaManager->method('createSchema')->willReturn($schema);
431+
$driverConnection->method('getSchemaManager')->willReturn($schemaManager);
432+
433+
$platformMock = $this->createMock($platformClass);
434+
$platformMock
435+
->expects(self::once())
436+
->method('getAlterTableSQL')
437+
->with(self::callback(static function (TableDiff $tableDiff): bool {
438+
return 0 === \count($tableDiff->addedIndexes) && 0 === \count($tableDiff->changedIndexes) && 0 === \count($tableDiff->removedIndexes);
439+
}))
440+
->willReturn([]);
441+
$driverConnection->method('getDatabasePlatform')->willReturn($platformMock);
442+
443+
$connection = new Connection([], $driverConnection);
444+
$connection->setup();
445+
}
446+
447+
public function setupIndicesProvider(): iterable
448+
{
449+
yield 'MySQL' => [
450+
MySQL57Platform::class,
451+
[['delivered_at']],
452+
];
453+
454+
yield 'Other platforms' => [
455+
AbstractPlatform::class,
456+
[['queue_name'], ['available_at'], ['delivered_at']],
457+
];
458+
}
405459
}

‎src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php
+6-3Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
use Doctrine\DBAL\Exception;
1818
use Doctrine\DBAL\Exception\TableNotFoundException;
1919
use Doctrine\DBAL\LockMode;
20+
use Doctrine\DBAL\Platforms\MySqlPlatform;
2021
use Doctrine\DBAL\Query\QueryBuilder;
2122
use Doctrine\DBAL\Result;
2223
use Doctrine\DBAL\Schema\Comparator;
@@ -386,7 +387,6 @@ private function getSchema(): Schema
386387
$table->addColumn('headers', self::$useDeprecatedConstants ? Type::TEXT : Types::TEXT)
387388
->setNotnull(true);
388389
$table->addColumn('queue_name', self::$useDeprecatedConstants ? Type::STRING : Types::STRING)
389-
->setLength(190) // MySQL 5.6 only supports 191 characters on an indexed column in utf8mb4 mode
390390
->setNotnull(true);
391391
$table->addColumn('created_at', self::$useDeprecatedConstants ? Type::DATETIME : Types::DATETIME_MUTABLE)
392392
->setNotnull(true);
@@ -395,8 +395,11 @@ private function getSchema(): Schema
395395
$table->addColumn('delivered_at', self::$useDeprecatedConstants ? Type::DATETIME : Types::DATETIME_MUTABLE)
396396
->setNotnull(false);
397397
$table->setPrimaryKey(['id']);
398-
$table->addIndex(['queue_name']);
399-
$table->addIndex(['available_at']);
398+
// No indices on queue_name and available_at on MySQL to prevent deadlock issues when running multiple consumers.
399+
if (!$this->driverConnection->getDatabasePlatform() instanceof MySqlPlatform) {
400+
$table->addIndex(['queue_name']);
401+
$table->addIndex(['available_at']);
402+
}
400403
$table->addIndex(['delivered_at']);
401404

402405
return $schema;

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.