Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

[DoctrineMessenger] [WIP] Postgres LISTEN/NOTIFY improvement proposition for better handling of time limit, multi-queue worker and delayed tasks #47666

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: 7.4
Choose a base branch
Loading
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Bridge\Doctrine\EventListener;

use Psr\Log\LoggerInterface;
use RuntimeException;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Bridge\Doctrine\Transport\PostgreSqlConnection;
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
use Symfony\Component\Messenger\Event\WorkerStartedEvent;
use Symfony\Component\Messenger\Worker;

/**
* Operational only when DoctrineTransport, PostgreSQL and pg_notify is used. On worker's idle event, the worker is
* going to be put to sleep until woken up by pg_notify.
*/
class PostgreSqlWaitForPgNotifyOnIdleListener implements EventSubscriberInterface
{
private ?LoggerInterface $logger;
private ?PostgreSqlConnection $leaderConnection;

/** @var array<string, PostgreSqlConnection> Keyed by Messenger's transport names */
private array $postgreSqlConnectionsMapping;

public function __construct(LoggerInterface $logger = null)
{
$this->logger = $logger;
}

public function registerPostgreSqlConnectionCandidate(string $messengerTransportName, PostgreSqlConnection $connection): void
{
// @todo-PR The registration this way is needed because there's no way to retrieve DoctrineReceiver's Connection
// otherwise. I would prefer to add a DoctrineReceiver::getConnection() and DoctrineTransport::getConnection(),
// as this would reduce the "magic" here. As things currently stand, this event listener is a noop if
// DoctrineTransportFactory is not used to instantiate transports.
$this->postgreSqlConnectionsMapping[$messengerTransportName] = $connection;
}

public function onWorkerStarted(WorkerStartedEvent $event): void
{
$this->leaderConnection = $this->electLeaderPostgreSqlConnection($event->getWorker());

if ($this->leaderConnection) {
// It's important to start pg listening on the very start of the worker, so that no pg events are missed
// between the first get() and "idleWait()"
$this->leaderConnection->registerPgNotifyListener();
}
}

public function onWorkerRunning(WorkerRunningEvent $event): void
{
if (!$this->leaderConnection || !$event->isWorkerIdle()) {
return;
}

$this->logger?->debug('Worker going into sleep until PostgreSQL LISTEN/NOTIFY wakeup');

// Note that this sync-wait is meant to "wake up" on anything potentially interesting. It means that it
// will sometimes wake up for irrelevant reasons, which is fine.
$this->leaderConnection->sleepUntilPgNotify($this->choosePgNotifyTimeout($event->getWorker()));
}

public static function getSubscribedEvents(): array
{
return [
WorkerStartedEvent::class => 'onWorkerStarted',
WorkerRunningEvent::class => 'onWorkerRunning',
];
}

private function electLeaderPostgreSqlConnection(Worker $worker): ?PostgreSqlConnection
{
/** @var PostgreSqlConnection[] $usedPostgreSqlConnections */
$usedPostgreSqlConnections = [];
$nonPostgreSqlConnectionIsUsed = false;
foreach ($worker->getMetadata()->getTransportNames() as $transportName) {
$postgreSqlConnection = $this->postgreSqlConnectionsMapping[$transportName] ?? null;

if (!$postgreSqlConnection) {
$nonPostgreSqlConnectionIsUsed = true;
}

$usedPostgreSqlConnections[] = $postgreSqlConnection;
}

if (!$usedPostgreSqlConnections) {
return null;
}

// elect the leader
$leaderPostgreSqlConnection = null;
foreach ($usedPostgreSqlConnections as $usedPostgreSqlConnection) {
$connectionConfiguration = $usedPostgreSqlConnection->getConfiguration();

if (!$connectionConfiguration['use_notify'] || $connectionConfiguration['get_notify_timeout'] < 1) {
continue;
}

$leaderPostgreSqlConnection = $usedPostgreSqlConnection;
break;
}

if (!$leaderPostgreSqlConnection) {
return null;
}

if ($nonPostgreSqlConnectionIsUsed) {
throw new \RuntimeException('Cannot start Messenger Worker with a mix of PostgreSQL queues that use the pg_notify feature, and with other queues. Please either start a Worker with only pg_notify PostgreSQL queues or disable the pg_notify feature on those queues.');
}

// validate that all used connection have common configuration
$leaderConnectionConfiguration = $leaderPostgreSqlConnection->getConfiguration();
$requiredCommonConnectionConfigurationKeys = [
'connection',
'table_name',
'get_notify_timeout',
];
foreach ($usedPostgreSqlConnections as $usedPostgreSqlConnection) {
$connectionConfiguration = $usedPostgreSqlConnection->getConfiguration();

foreach ($requiredCommonConnectionConfigurationKeys as $configurationKey) {
if ($leaderConnectionConfiguration[$configurationKey] === $connectionConfiguration[$configurationKey]) {
continue;
}

throw new RuntimeException("Cannot start Messenger Worker with a set of PostgreSQL queues that do not have common configuration. Conflict configuration key: \"{$configurationKey}\". Expected value: \"{$leaderConnectionConfiguration[$configurationKey]}\", actual value: \"{$connectionConfiguration[$configurationKey]}\"");
}
}

return $leaderPostgreSqlConnection;
}

private function choosePgNotifyTimeout(Worker $worker): ?int
{
// @todo Used to choose a sleep timeout as explained in https://github.com/symfony/symfony/issues/46862, proposition 2.
// Note: don't forget to consider 'redeliver_timeout' option in the used pg connections' configuration.

return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ public function __construct(array $configuration, DBALConnection $driverConnecti

public function reset()
{
$this->queueEmptiedAt = null;
}

public function getConfiguration(): array
Expand Down Expand Up @@ -211,13 +210,9 @@ public function get(): ?array

if (false === $doctrineEnvelope) {
$this->driverConnection->commit();
$this->queueEmptiedAt = microtime(true) * 1000;

return null;
}
// Postgres can "group" notifications having the same channel and payload
// We need to be sure to empty the queue before blocking again
$this->queueEmptiedAt = null;

$doctrineEnvelope = $this->decodeEnvelopeHeaders($doctrineEnvelope);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

use Doctrine\DBAL\Platforms\PostgreSQLPlatform;
use Doctrine\Persistence\ConnectionRegistry;
use Symfony\Component\Messenger\Bridge\Doctrine\EventListener\PostgreSqlWaitForPgNotifyOnIdleListener;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
Expand All @@ -24,15 +25,18 @@
class DoctrineTransportFactory implements TransportFactoryInterface
{
private ConnectionRegistry $registry;
private PostgreSqlWaitForPgNotifyOnIdleListener $postgreSqlWaitForPgNotifyOnIdleListener;

public function __construct(ConnectionRegistry $registry)
public function __construct(ConnectionRegistry $registry, PostgreSqlWaitForPgNotifyOnIdleListener $postgreSqlWaitForPgNotifyOnIdleListener)
{
$this->registry = $registry;
$this->postgreSqlWaitForPgNotifyOnIdleListener = $postgreSqlWaitForPgNotifyOnIdleListener;
}

public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
{
$useNotify = ($options['use_notify'] ?? true);
$transportName = $options['transport_name'] ?? null;
unset($options['transport_name'], $options['use_notify']);
// Always allow PostgreSQL-specific keys, to be able to transparently fallback to the native driver when LISTEN/NOTIFY isn't available
$configuration = PostgreSqlConnection::buildConfiguration($dsn, $options);
Expand All @@ -45,6 +49,10 @@ public function createTransport(string $dsn, array $options, SerializerInterface

if ($useNotify && $driverConnection->getDatabasePlatform() instanceof PostgreSQLPlatform) {
$connection = new PostgreSqlConnection($configuration, $driverConnection);

if ($transportName) {
$this->postgreSqlWaitForPgNotifyOnIdleListener->registerPostgreSqlConnectionCandidate($transportName, $connection);
}
} else {
$connection = new Connection($configuration, $driverConnection);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,40 +54,6 @@ public function reset()
$this->unlisten();
}

public function get(): ?array
{
if (null === $this->queueEmptiedAt) {
return parent::get();
}

// This is secure because the table name must be a valid identifier:
// https://www.postgresql.org/docs/current/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS
$this->executeStatement(sprintf('LISTEN "%s"', $this->configuration['table_name']));

if (method_exists($this->driverConnection, 'getNativeConnection')) {
$wrappedConnection = $this->driverConnection->getNativeConnection();
} else {
$wrappedConnection = $this->driverConnection;
while (method_exists($wrappedConnection, 'getWrappedConnection')) {
$wrappedConnection = $wrappedConnection->getWrappedConnection();
}
}

$notification = $wrappedConnection->pgsqlGetNotify(\PDO::FETCH_ASSOC, $this->configuration['get_notify_timeout']);
if (
// no notifications, or for another table or queue
(false === $notification || $notification['message'] !== $this->configuration['table_name'] || $notification['payload'] !== $this->configuration['queue_name']) &&
// delayed messages
(microtime(true) * 1000 - $this->queueEmptiedAt < $this->configuration['check_delayed_interval'])
) {
usleep(1000);

return null;
}

return parent::get();
}

public function setup(): void
{
parent::setup();
Expand Down Expand Up @@ -143,8 +109,47 @@ private function createTriggerFunctionName(): string
return sprintf('%1$s.notify_%2$s', $tableConfig[0], $tableConfig[1]);
}

/**
* @internal
*/
public function registerPgNotifyListener(): void
{
// This is secure because the table name must be a valid identifier:
// https://www.postgresql.org/docs/current/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS
// Running "LISTEN" potentially multiple times is expected and desired, as discussed here:
// https://github.com/symfony/symfony/pull/47209
$this->executeStatement(sprintf('LISTEN "%s"', $this->configuration['table_name']));
}

private function unlisten()
{
$this->executeStatement(sprintf('UNLISTEN "%s"', $this->configuration['table_name']));
}

/**
* @internal
*
* Note that this sleep will wake up on anything interesting happening on the observed db table. Wake ups
* with no new queue tasks should be expected and gracefully handled.
*/
public function sleepUntilPgNotify(?int $preferredPgNotifyTimeout): void
{
$notifyTimeout = $preferredPgNotifyTimeout ?? $this->configuration['get_notify_timeout'];
if ($notifyTimeout < 1) {
return;
}

if (method_exists($this->driverConnection, 'getNativeConnection')) {
$wrappedConnection = $this->driverConnection->getNativeConnection();
} else {
$wrappedConnection = $this->driverConnection;
while (method_exists($wrappedConnection, 'getWrappedConnection')) {
$wrappedConnection = $wrappedConnection->getWrappedConnection();
}
}

$this->registerPgNotifyListener();

$wrappedConnection->pgsqlGetNotify(\PDO::FETCH_ASSOC, $notifyTimeout);
}
}
Morty Proxy This is a proxified and sanitized view of the page, visit original site.