Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit f7cc0b1

Browse filesBrowse files
committed
Remove indices in messenger table on MySQL to prevent deadlocks while removing messages when running multiple consumers
SELECT ... FOR UPDATE row locks also locks indices. Since locking rows and indices is not one atomic operation, this might cause deadlocks when running multiple workers. Removing indices on queue_name and available_at resolves this problem.
1 parent 2cee75b commit f7cc0b1
Copy full SHA for f7cc0b1

File tree

2 files changed

+61
-3
lines changed
Filter options

2 files changed

+61
-3
lines changed

‎src/Symfony/Component/Messenger/Tests/Transport/Doctrine/ConnectionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/Doctrine/ConnectionTest.php
+55Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;
1313

1414
use Doctrine\DBAL\Abstraction\Result as AbstractionResult;
15+
use Doctrine\DBAL\Configuration;
1516
use Doctrine\DBAL\Connection as DBALConnection;
1617
use Doctrine\DBAL\DBALException;
1718
use Doctrine\DBAL\Driver\Result as DriverResult;
@@ -23,8 +24,12 @@
2324
use Doctrine\DBAL\Query\QueryBuilder;
2425
use Doctrine\DBAL\Result;
2526
use Doctrine\DBAL\Schema\AbstractSchemaManager;
27+
use Doctrine\DBAL\Schema\Schema;
2628
use Doctrine\DBAL\Schema\SchemaConfig;
29+
use Doctrine\DBAL\Schema\TableDiff;
2730
use Doctrine\DBAL\Statement;
31+
use Doctrine\DBAL\Types\Types;
32+
use PHPUnit\Framework\MockObject\MockObject;
2833
use PHPUnit\Framework\TestCase;
2934
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
3035
use Symfony\Component\Messenger\Exception\TransportException;
@@ -402,4 +407,54 @@ public function providePlatformSql(): iterable
402407
'SELECT m.* FROM messenger_messages m WITH (UPDLOCK, ROWLOCK) WHERE (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) AND (m.queue_name = ?) ORDER BY available_at ASC OFFSET 0 ROWS FETCH NEXT 1 ROWS ONLY ',
403408
];
404409
}
410+
411+
/**
412+
* @dataProvider setupIndicesProvider
413+
*/
414+
public function testSetupIndices(string $platformClass, array $expectedIndices)
415+
{
416+
$driverConnection = $this->createMock(DBALConnection::class);
417+
$driverConnection->method('getConfiguration')->willReturn(new Configuration());
418+
419+
$schemaManager = $this->createMock(AbstractSchemaManager::class);
420+
$schema = new Schema();
421+
$expectedTable = $schema->createTable('messenger_messages');
422+
$expectedTable->addColumn('id', Types::BIGINT);
423+
$expectedTable->setPrimaryKey(['id']);
424+
// Make sure columns for indices exists so addIndex() will not throw
425+
foreach (array_unique(array_merge(...$expectedIndices)) as $columnName) {
426+
$expectedTable->addColumn($columnName, Types::STRING);
427+
}
428+
foreach ($expectedIndices as $indexColumns) {
429+
$expectedTable->addIndex($indexColumns);
430+
}
431+
$schemaManager->method('createSchema')->willReturn($schema);
432+
$driverConnection->method('getSchemaManager')->willReturn($schemaManager);
433+
434+
$platformMock = $this->createMock($platformClass);
435+
$platformMock
436+
->expects(self::once())
437+
->method('getAlterTableSQL')
438+
->with(self::callback(static function (TableDiff $tableDiff): bool {
439+
return 0 === \count($tableDiff->addedIndexes) && 0 === \count($tableDiff->changedIndexes) && 0 === \count($tableDiff->removedIndexes);
440+
}))
441+
->willReturn([]);
442+
$driverConnection->method('getDatabasePlatform')->willReturn($platformMock);
443+
444+
$connection = new Connection([], $driverConnection);
445+
$connection->setup();
446+
}
447+
448+
public function setupIndicesProvider(): iterable
449+
{
450+
yield 'MySQL' => [
451+
MySQL57Platform::class,
452+
[['delivered_at']],
453+
];
454+
455+
yield 'Other platforms' => [
456+
AbstractPlatform::class,
457+
[['queue_name'], ['available_at'], ['delivered_at']],
458+
];
459+
}
405460
}

‎src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php
+6-3Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
use Doctrine\DBAL\Exception;
1818
use Doctrine\DBAL\Exception\TableNotFoundException;
1919
use Doctrine\DBAL\LockMode;
20+
use Doctrine\DBAL\Platforms\MySqlPlatform;
2021
use Doctrine\DBAL\Query\QueryBuilder;
2122
use Doctrine\DBAL\Result;
2223
use Doctrine\DBAL\Schema\Comparator;
@@ -386,7 +387,6 @@ private function getSchema(): Schema
386387
$table->addColumn('headers', self::$useDeprecatedConstants ? Type::TEXT : Types::TEXT)
387388
->setNotnull(true);
388389
$table->addColumn('queue_name', self::$useDeprecatedConstants ? Type::STRING : Types::STRING)
389-
->setLength(190) // MySQL 5.6 only supports 191 characters on an indexed column in utf8mb4 mode
390390
->setNotnull(true);
391391
$table->addColumn('created_at', self::$useDeprecatedConstants ? Type::DATETIME : Types::DATETIME_MUTABLE)
392392
->setNotnull(true);
@@ -395,8 +395,11 @@ private function getSchema(): Schema
395395
$table->addColumn('delivered_at', self::$useDeprecatedConstants ? Type::DATETIME : Types::DATETIME_MUTABLE)
396396
->setNotnull(false);
397397
$table->setPrimaryKey(['id']);
398-
$table->addIndex(['queue_name']);
399-
$table->addIndex(['available_at']);
398+
// No indices on queue_name and available_at on MySQL to prevent deadlock issues when running multiple consumers.
399+
if (!$this->driverConnection->getDatabasePlatform() instanceof MySqlPlatform) {
400+
$table->addIndex(['queue_name']);
401+
$table->addIndex(['available_at']);
402+
}
400403
$table->addIndex(['delivered_at']);
401404

402405
return $schema;

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.