Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit ee0c97b

Browse filesBrowse files
committed
Remove indices in messenger table on MySQL to prevent deadlocks while removing messages when running multiple consumers
SELECT ... FOR UPDATE row locks also locks indices. Since locking rows and indices is not one atomic operation, this might cause deadlocks when running multiple workers. Removing indices on queue_name and available_at resolves this problem.
1 parent 2cee75b commit ee0c97b
Copy full SHA for ee0c97b

File tree

2 files changed

+63
-3
lines changed
Filter options

2 files changed

+63
-3
lines changed

‎src/Symfony/Component/Messenger/Tests/Transport/Doctrine/ConnectionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/Doctrine/ConnectionTest.php
+57Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;
1313

1414
use Doctrine\DBAL\Abstraction\Result as AbstractionResult;
15+
use Doctrine\DBAL\Configuration;
1516
use Doctrine\DBAL\Connection as DBALConnection;
1617
use Doctrine\DBAL\DBALException;
1718
use Doctrine\DBAL\Driver\Result as DriverResult;
@@ -23,8 +24,12 @@
2324
use Doctrine\DBAL\Query\QueryBuilder;
2425
use Doctrine\DBAL\Result;
2526
use Doctrine\DBAL\Schema\AbstractSchemaManager;
27+
use Doctrine\DBAL\Schema\Schema;
2628
use Doctrine\DBAL\Schema\SchemaConfig;
29+
use Doctrine\DBAL\Schema\TableDiff;
2730
use Doctrine\DBAL\Statement;
31+
use Doctrine\DBAL\Types\Types;
32+
use PHPUnit\Framework\MockObject\MockObject;
2833
use PHPUnit\Framework\TestCase;
2934
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
3035
use Symfony\Component\Messenger\Exception\TransportException;
@@ -402,4 +407,56 @@ public function providePlatformSql(): iterable
402407
'SELECT m.* FROM messenger_messages m WITH (UPDLOCK, ROWLOCK) WHERE (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) AND (m.queue_name = ?) ORDER BY available_at ASC OFFSET 0 ROWS FETCH NEXT 1 ROWS ONLY ',
403408
];
404409
}
410+
411+
/**
412+
* @param AbstractPlatform&MockObject $platformMock
413+
* @param string[][] $expectedIndices
414+
* @dataProvider setupIndicesProvider
415+
*/
416+
public function testSetupIndices(AbstractPlatform $platformMock, array $expectedIndices)
417+
{
418+
$schema = new Schema();
419+
$expectedTable = $schema->createTable('messenger_messages');
420+
$expectedTable->addColumn('id', Types::BIGINT);
421+
$expectedTable->setPrimaryKey(['id']);
422+
// Make sure columns for indices exists so addIndex() will not throw
423+
foreach (array_unique(array_merge(...$expectedIndices)) as $columnName) {
424+
$expectedTable->addColumn($columnName, Types::STRING);
425+
}
426+
foreach ($expectedIndices as $indexColumns) {
427+
$expectedTable->addIndex($indexColumns);
428+
}
429+
430+
$schemaManager = $this->createMock(AbstractSchemaManager::class);
431+
$schemaManager->method('createSchema')->willReturn($schema);
432+
$driverConnection = $this->createMock(DBALConnection::class);
433+
$driverConnection->method('getConfiguration')->willReturn(new Configuration());
434+
$driverConnection->method('getDatabasePlatform')->willReturn($platformMock);
435+
$driverConnection->method('getSchemaManager')->willReturn($schemaManager);
436+
$driverConnection->expects(self::never())->method('executeStatement');
437+
438+
$platformMock
439+
->expects(self::once())
440+
->method('getAlterTableSQL')
441+
->with(self::callback(static function (TableDiff $tableDiff): bool {
442+
return 0 === \count($tableDiff->addedIndexes) && 0 === \count($tableDiff->changedIndexes) && 0 === \count($tableDiff->removedIndexes);
443+
}))
444+
->willReturn([]);
445+
446+
$connection = new Connection([], $driverConnection);
447+
$connection->setup();
448+
}
449+
450+
public function setupIndicesProvider(): iterable
451+
{
452+
yield 'MySQL' => [
453+
$this->createMock(MySQL57Platform::class),
454+
[['delivered_at']],
455+
];
456+
457+
yield 'Other platforms' => [
458+
$this->createMock(AbstractPlatform::class),
459+
[['queue_name'], ['available_at'], ['delivered_at']],
460+
];
461+
}
405462
}

‎src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php
+6-3Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
use Doctrine\DBAL\Exception;
1818
use Doctrine\DBAL\Exception\TableNotFoundException;
1919
use Doctrine\DBAL\LockMode;
20+
use Doctrine\DBAL\Platforms\MySqlPlatform;
2021
use Doctrine\DBAL\Query\QueryBuilder;
2122
use Doctrine\DBAL\Result;
2223
use Doctrine\DBAL\Schema\Comparator;
@@ -386,7 +387,6 @@ private function getSchema(): Schema
386387
$table->addColumn('headers', self::$useDeprecatedConstants ? Type::TEXT : Types::TEXT)
387388
->setNotnull(true);
388389
$table->addColumn('queue_name', self::$useDeprecatedConstants ? Type::STRING : Types::STRING)
389-
->setLength(190) // MySQL 5.6 only supports 191 characters on an indexed column in utf8mb4 mode
390390
->setNotnull(true);
391391
$table->addColumn('created_at', self::$useDeprecatedConstants ? Type::DATETIME : Types::DATETIME_MUTABLE)
392392
->setNotnull(true);
@@ -395,8 +395,11 @@ private function getSchema(): Schema
395395
$table->addColumn('delivered_at', self::$useDeprecatedConstants ? Type::DATETIME : Types::DATETIME_MUTABLE)
396396
->setNotnull(false);
397397
$table->setPrimaryKey(['id']);
398-
$table->addIndex(['queue_name']);
399-
$table->addIndex(['available_at']);
398+
// No indices on queue_name and available_at on MySQL to prevent deadlock issues when running multiple consumers.
399+
if (!$this->driverConnection->getDatabasePlatform() instanceof MySqlPlatform) {
400+
$table->addIndex(['queue_name']);
401+
$table->addIndex(['available_at']);
402+
}
400403
$table->addIndex(['delivered_at']);
401404

402405
return $schema;

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.