Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

[Messenger] Remove indices in messenger table on MySQL to prevent deadlocks while removing messages when running multiple consumers #42345

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Tests\Transport\Doctrine;

use Doctrine\DBAL\Abstraction\Result as AbstractionResult;
use Doctrine\DBAL\Configuration;
use Doctrine\DBAL\Connection as DBALConnection;
use Doctrine\DBAL\DBALException;
use Doctrine\DBAL\Driver\Result as DriverResult;
Expand All @@ -23,8 +24,11 @@
use Doctrine\DBAL\Query\QueryBuilder;
use Doctrine\DBAL\Result;
use Doctrine\DBAL\Schema\AbstractSchemaManager;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Schema\SchemaConfig;
use Doctrine\DBAL\Schema\TableDiff;
use Doctrine\DBAL\Statement;
use Doctrine\DBAL\Types\Types;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
use Symfony\Component\Messenger\Exception\TransportException;
Expand Down Expand Up @@ -402,4 +406,54 @@ public function providePlatformSql(): iterable
'SELECT m.* FROM messenger_messages m WITH (UPDLOCK, ROWLOCK) WHERE (m.delivered_at is null OR m.delivered_at < ?) AND (m.available_at <= ?) AND (m.queue_name = ?) ORDER BY available_at ASC OFFSET 0 ROWS FETCH NEXT 1 ROWS ONLY ',
];
}

/**
* @dataProvider setupIndicesProvider
*/
public function testSetupIndices(string $platformClass, array $expectedIndices)
{
$driverConnection = $this->createMock(DBALConnection::class);
$driverConnection->method('getConfiguration')->willReturn(new Configuration());

$schemaManager = $this->createMock(AbstractSchemaManager::class);
$schema = new Schema();
$expectedTable = $schema->createTable('messenger_messages');
$expectedTable->addColumn('id', Types::BIGINT);
$expectedTable->setPrimaryKey(['id']);
// Make sure columns for indices exists so addIndex() will not throw
foreach (array_unique(array_merge(...$expectedIndices)) as $columnName) {
$expectedTable->addColumn($columnName, Types::STRING);
}
foreach ($expectedIndices as $indexColumns) {
$expectedTable->addIndex($indexColumns);
}
$schemaManager->method('createSchema')->willReturn($schema);
$driverConnection->method('getSchemaManager')->willReturn($schemaManager);

$platformMock = $this->createMock($platformClass);
$platformMock
->expects(self::once())
->method('getAlterTableSQL')
->with(self::callback(static function (TableDiff $tableDiff): bool {
return 0 === \count($tableDiff->addedIndexes) && 0 === \count($tableDiff->changedIndexes) && 0 === \count($tableDiff->removedIndexes);
}))
->willReturn([]);
$driverConnection->method('getDatabasePlatform')->willReturn($platformMock);

$connection = new Connection([], $driverConnection);
$connection->setup();
}

public function setupIndicesProvider(): iterable
{
yield 'MySQL' => [
MySQL57Platform::class,
[['delivered_at']],
];

yield 'Other platforms' => [
AbstractPlatform::class,
[['queue_name'], ['available_at'], ['delivered_at']],
];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use Doctrine\DBAL\Exception;
use Doctrine\DBAL\Exception\TableNotFoundException;
use Doctrine\DBAL\LockMode;
use Doctrine\DBAL\Platforms\MySqlPlatform;
use Doctrine\DBAL\Query\QueryBuilder;
use Doctrine\DBAL\Result;
use Doctrine\DBAL\Schema\Comparator;
Expand Down Expand Up @@ -386,7 +387,6 @@ private function getSchema(): Schema
$table->addColumn('headers', self::$useDeprecatedConstants ? Type::TEXT : Types::TEXT)
->setNotnull(true);
$table->addColumn('queue_name', self::$useDeprecatedConstants ? Type::STRING : Types::STRING)
->setLength(190) // MySQL 5.6 only supports 191 characters on an indexed column in utf8mb4 mode
fabpot marked this conversation as resolved.
Show resolved Hide resolved
->setNotnull(true);
$table->addColumn('created_at', self::$useDeprecatedConstants ? Type::DATETIME : Types::DATETIME_MUTABLE)
->setNotnull(true);
Expand All @@ -395,8 +395,11 @@ private function getSchema(): Schema
$table->addColumn('delivered_at', self::$useDeprecatedConstants ? Type::DATETIME : Types::DATETIME_MUTABLE)
->setNotnull(false);
$table->setPrimaryKey(['id']);
$table->addIndex(['queue_name']);
$table->addIndex(['available_at']);
// No indices on queue_name and available_at on MySQL to prevent deadlock issues when running multiple consumers.
if (!$this->driverConnection->getDatabasePlatform() instanceof MySqlPlatform) {
$table->addIndex(['queue_name']);
$table->addIndex(['available_at']);
}
$table->addIndex(['delivered_at']);

return $schema;
Expand Down
Morty Proxy This is a proxified and sanitized view of the page, visit original site.