Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 2a31b47

Browse filesBrowse files
committed
[Messenger] Doctrine transport - add an option for the id strategy
1 parent 1e083a0 commit 2a31b47
Copy full SHA for 2a31b47

File tree

3 files changed

+65
-16
lines changed
Filter options

3 files changed

+65
-16
lines changed

‎src/Symfony/Component/Messenger/Tests/Transport/Doctrine/ConnectionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/Doctrine/ConnectionTest.php
+8Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,4 +211,12 @@ public function testItThrowsAnExceptionIfAnExtraOptionsInDefinedInDSN()
211211
{
212212
Connection::buildConfiguration('doctrine://default?new_option=woops');
213213
}
214+
215+
/**
216+
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
217+
*/
218+
public function testItThrowsAnExceptionIfTheIdStrategyIsNotSupported()
219+
{
220+
Connection::buildConfiguration('doctrine://default?id_strategy=not_supported');
221+
}
214222
}

‎src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineIntegrationTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/Doctrine/DoctrineIntegrationTest.php
+17-5Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,9 @@ protected function setUp()
4343
public function testConnectionSendAndGet()
4444
{
4545
$this->connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
46-
$encoded = $this->connection->get();
47-
$this->assertEquals('{"message": "Hi"}', $encoded['body']);
48-
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
46+
$doctrineEnvelop = $this->connection->get();
47+
$this->assertEquals('{"message": "Hi"}', $doctrineEnvelop['body']);
48+
$this->assertEquals(['type' => DummyMessage::class], $doctrineEnvelop['headers']);
4949
}
5050

5151
public function testSendWithDelay()
@@ -96,8 +96,8 @@ public function testItRetrieveTheFirstAvailableMessage()
9696
'available_at' => '2019-03-15 12:30:00',
9797
]);
9898

99-
$encoded = $this->connection->get();
100-
$this->assertEquals('{"message": "Hi available"}', $encoded['body']);
99+
$doctrineEnvelop = $this->connection->get();
100+
$this->assertEquals('{"message": "Hi available"}', $doctrineEnvelop['body']);
101101
}
102102

103103
public function testItRetrieveTheMessageThatIsOlderThanRedeliverTimeout()
@@ -124,4 +124,16 @@ public function testItRetrieveTheMessageThatIsOlderThanRedeliverTimeout()
124124
$this->assertEquals('{"message": "Hi requeued"}', $next['body']);
125125
$this->connection->reject($next['id']);
126126
}
127+
128+
public function testUuidStrategy()
129+
{
130+
$connection = new Connection(['id_strategy' => Connection::ID_STRATEGY_UUID], $this->driverConnection);
131+
$this->driverConnection->exec('DROP TABLE messenger_messages');
132+
$connection->setup();
133+
$connection->send('{"message": "Hi uuid"}', ['type' => DummyMessage::class]);
134+
$message = $this->connection->get();
135+
$this->assertEquals(1, preg_match('/[a-f0-9]{8}\-[a-f0-9]{4}\-[a-f0-9]{4}\-(8|9|a|b)[a-f0-9]{3}\-[a-f0-9]{12}/', $message['id']));
136+
$connection->reject($message['id']);
137+
$this->driverConnection->exec('DROP TABLE messenger_messages');
138+
}
127139
}

‎src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php
+40-11Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ class Connection
2929
* Configuration of the connection.
3030
*
3131
* * table_name: name of the table
32+
* * id_strategy: Strategy for the id field. uuid or auto_increment: Default: auto_increment
3233
* * connection: name of the Doctrine's entity manager
3334
* * queue_name: name of the queue
3435
* * redeliver_timeout: Timeout before redeliver messages still in handling state (i.e: delivered_at is not null and message is still in table). Default 3600
@@ -39,11 +40,18 @@ class Connection
3940
private $driverConnection;
4041
const DEFAULT_OPTIONS = [
4142
'table_name' => 'messenger_messages',
43+
'id_strategy' => self::ID_STRATEGY_AUTO_INCREMENT,
4244
'queue_name' => 'default',
4345
'redeliver_timeout' => 3600,
4446
'loop_sleep' => 200000,
4547
'auto_setup' => true,
4648
];
49+
const ID_STRATEGY_AUTO_INCREMENT = 'auto_increment';
50+
const ID_STRATEGY_UUID = 'uuid';
51+
const ID_STRATEGIES = [
52+
self::ID_STRATEGY_AUTO_INCREMENT,
53+
self::ID_STRATEGY_UUID,
54+
];
4755

4856
public function __construct(array $configuration, DBALConnection $driverConnection)
4957
{
@@ -71,12 +79,17 @@ public static function buildConfiguration($dsn, array $options = [])
7179
$configuration = [
7280
'connection' => $components['host'],
7381
'table_name' => $options['table_name'] ?? ($query['table_name'] ?? self::DEFAULT_OPTIONS['table_name']),
82+
'id_strategy' => $options['id_strategy'] ?? ($query['id_strategy'] ?? self::DEFAULT_OPTIONS['id_strategy']),
7483
'queue_name' => $options['queue_name'] ?? ($query['queue_name'] ?? self::DEFAULT_OPTIONS['queue_name']),
7584
'redeliver_timeout' => $options['redeliver_timeout'] ?? ($query['redeliver_timeout'] ?? self::DEFAULT_OPTIONS['redeliver_timeout']),
7685
'loop_sleep' => $options['loop_sleep'] ?? ($query['loop_sleep'] ?? self::DEFAULT_OPTIONS['loop_sleep']),
7786
'auto_setup' => $options['auto_setup'] ?? ($query['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup']),
7887
];
7988

89+
if (!\in_array($configuration['id_strategy'], self::ID_STRATEGIES)) {
90+
throw new TransportException(sprintf('Unknown id_strategy "%s". Supported strategies are [%s]', $configuration['id_strategy'], implode(', ', self::ID_STRATEGIES)));
91+
}
92+
8093
// check for extra keys in options
8194
$optionsExtraKeys = array_diff(array_keys($options), array_keys($configuration));
8295
if (0 < \count($optionsExtraKeys)) {
@@ -102,15 +115,19 @@ public function send(string $body, array $headers, int $delay = 0): void
102115
$now = (\DateTime::createFromFormat('U.u', microtime(true)));
103116
$availableAt = (clone $now)->modify(sprintf('+%d seconds', $delay / 1000));
104117

118+
$values = [
119+
'body' => ':body',
120+
'headers' => ':headers',
121+
'queue_name' => ':queue_name',
122+
'created_at' => ':created_at',
123+
'available_at' => ':available_at',
124+
];
125+
if (self::ID_STRATEGY_UUID === $this->configuration['id_strategy']) {
126+
$values['id'] = $this->driverConnection->getDatabasePlatform()->getGuidExpression();
127+
}
105128
$queryBuilder = $this->driverConnection->createQueryBuilder()
106129
->insert($this->configuration['table_name'])
107-
->values([
108-
'body' => ':body',
109-
'headers' => ':headers',
110-
'queue_name' => ':queue_name',
111-
'created_at' => ':created_at',
112-
'available_at' => ':available_at',
113-
]);
130+
->values($values);
114131

115132
$this->executeQuery($queryBuilder->getSQL(), [
116133
':body' => $body,
@@ -223,11 +240,23 @@ private function getSchema(): Schema
223240
{
224241
$schema = new Schema();
225242
$table = $schema->createTable($this->configuration['table_name']);
226-
$table->addColumn('id', Type::BIGINT)
227-
->setAutoincrement(true)
228-
->setNotnull(true);
229-
$table->addColumn('body', Type::TEXT)
243+
switch ($this->configuration['id_strategy']) {
244+
case self::ID_STRATEGY_UUID:
245+
$table->addColumn('id', Type::GUID)
246+
->setNotnull(true);
247+
break;
248+
case self::ID_STRATEGY_AUTO_INCREMENT:
249+
$table->addColumn('id', Type::BIGINT)
250+
->setAutoincrement(true)
251+
->setNotnull(true);
252+
break;
253+
default:
254+
throw new TransportException(sprintf('Unknown id_strategy "%s". Supported strategies are [%s]', $this->configuration['id_strategy'], self::ID_STRATEGIES));
255+
}
256+
if ($this->configuration['id_strategy']) {
257+
$table->addColumn('body', Type::TEXT)
230258
->setNotnull(true);
259+
}
231260
$table->addColumn('headers', Type::JSON)
232261
->setNotnull(true);
233262
$table->addColumn('queue_name', Type::STRING)

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.