From 72ac5bfbe5d1b2e3be270dfb161c112456a32bed Mon Sep 17 00:00:00 2001 From: Asmir Mustafic Date: Sat, 6 Aug 2022 08:06:01 +0200 Subject: [PATCH] Always attempt to listen for notifications --- .../Transport/PostgreSqlConnectionTest.php | 67 +++++++++++++++++++ .../Transport/PostgreSqlConnection.php | 16 +---- 2 files changed, 70 insertions(+), 13 deletions(-) diff --git a/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/PostgreSqlConnectionTest.php b/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/PostgreSqlConnectionTest.php index f1ffffbb5687a..e8e00d97b3876 100644 --- a/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/PostgreSqlConnectionTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Doctrine/Tests/Transport/PostgreSqlConnectionTest.php @@ -11,6 +11,11 @@ namespace Symfony\Component\Messenger\Bridge\Doctrine\Tests\Transport; +use Doctrine\DBAL\Cache\ArrayResult; +use Doctrine\DBAL\Cache\ArrayStatement; +use Doctrine\DBAL\Platforms\PostgreSQLPlatform; +use Doctrine\DBAL\Query\QueryBuilder; +use Doctrine\DBAL\Result; use Doctrine\DBAL\Schema\Table; use PHPUnit\Framework\TestCase; use Symfony\Component\Messenger\Bridge\Doctrine\Transport\PostgreSqlConnection; @@ -42,6 +47,68 @@ public function testUnserialize() $connection->__wakeup(); } + public function testListenOnConnection() + { + $driverConnection = $this->createMock(\Doctrine\DBAL\Connection::class); + + $driverConnection + ->expects(self::any()) + ->method('getDatabasePlatform') + ->willReturn(new PostgreSQLPlatform()); + + $driverConnection + ->expects(self::any()) + ->method('createQueryBuilder') + ->willReturn(new QueryBuilder($driverConnection)); + + $wrappedConnection = new class() { + private $notifyCalls = 0; + + public function pgsqlGetNotify() + { + ++$this->notifyCalls; + + return false; + } + + public function countNotifyCalls() + { + return $this->notifyCalls; + } + }; + + // dbal 2.x + if (interface_exists(Result::class)) { + $driverConnection + ->expects(self::exactly(2)) + ->method('getWrappedConnection') + ->willReturn($wrappedConnection); + + $driverConnection + ->expects(self::any()) + ->method('executeQuery') + ->willReturn(new ArrayStatement([])); + } else { + // dbal 3.x + $driverConnection + ->expects(self::exactly(2)) + ->method('getNativeConnection') + ->willReturn($wrappedConnection); + + $driverConnection + ->expects(self::any()) + ->method('executeQuery') + ->willReturn(new Result(new ArrayResult([]), $driverConnection)); + } + $connection = new PostgreSqlConnection(['table_name' => 'queue_table'], $driverConnection); + + $connection->get(); // first time we have queueEmptiedAt === null, fallback on the parent implementation + $connection->get(); + $connection->get(); + + $this->assertSame(2, $wrappedConnection->countNotifyCalls()); + } + public function testGetExtraSetupSql() { $driverConnection = $this->createMock(\Doctrine\DBAL\Connection::class); diff --git a/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/PostgreSqlConnection.php b/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/PostgreSqlConnection.php index 39615f7344be1..3691a9383f293 100644 --- a/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/PostgreSqlConnection.php +++ b/src/Symfony/Component/Messenger/Bridge/Doctrine/Transport/PostgreSqlConnection.php @@ -33,8 +33,6 @@ final class PostgreSqlConnection extends Connection 'get_notify_timeout' => 0, ]; - private $listening = false; - public function __sleep(): array { throw new \BadMethodCallException('Cannot serialize '.__CLASS__); @@ -62,12 +60,9 @@ public function get(): ?array return parent::get(); } - if (!$this->listening) { - // This is secure because the table name must be a valid identifier: - // https://www.postgresql.org/docs/current/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS - $this->executeStatement(sprintf('LISTEN "%s"', $this->configuration['table_name'])); - $this->listening = true; - } + // This is secure because the table name must be a valid identifier: + // https://www.postgresql.org/docs/current/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS + $this->executeStatement(sprintf('LISTEN "%s"', $this->configuration['table_name'])); if (method_exists($this->driverConnection, 'getNativeConnection')) { $wrappedConnection = $this->driverConnection->getNativeConnection(); @@ -150,11 +145,6 @@ private function createTriggerFunctionName(): string private function unlisten() { - if (!$this->listening) { - return; - } - $this->executeStatement(sprintf('UNLISTEN "%s"', $this->configuration['table_name'])); - $this->listening = false; } }