13
13
14
14
use Doctrine \DBAL \Connection as DBALConnection ;
15
15
use Doctrine \DBAL \DBALException ;
16
+ use Doctrine \DBAL \Driver \Exception as DriverException ;
16
17
use Doctrine \DBAL \Driver \Result as DriverResult ;
17
18
use Doctrine \DBAL \Exception ;
18
19
use Doctrine \DBAL \Exception \TableNotFoundException ;
@@ -157,6 +158,14 @@ public function send(string $body, array $headers, int $delay = 0): string
157
158
158
159
public function get (): ?array
159
160
{
161
+ if ($ this ->driverConnection ->getDatabasePlatform () instanceof MySQLPlatform) {
162
+ try {
163
+ $ this ->driverConnection ->delete ($ this ->configuration ['table_name ' ], ['delivered_at ' => '9999-12-31 ' ]);
164
+ } catch (DriverException $ e ) {
165
+ // Ignore the exception
166
+ }
167
+ }
168
+
160
169
get:
161
170
$ this ->driverConnection ->beginTransaction ();
162
171
try {
@@ -224,6 +233,10 @@ public function get(): ?array
224
233
public function ack (string $ id ): bool
225
234
{
226
235
try {
236
+ if ($ this ->driverConnection ->getDatabasePlatform () instanceof MySQLPlatform) {
237
+ return $ this ->driverConnection ->update ($ this ->configuration ['table_name ' ], ['delivered_at ' => '9999-12-31 ' ], ['id ' => $ id ]) > 0 ;
238
+ }
239
+
227
240
return $ this ->driverConnection ->delete ($ this ->configuration ['table_name ' ], ['id ' => $ id ]) > 0 ;
228
241
} catch (DBALException |Exception $ exception ) {
229
242
throw new TransportException ($ exception ->getMessage (), 0 , $ exception );
@@ -233,6 +246,10 @@ public function ack(string $id): bool
233
246
public function reject (string $ id ): bool
234
247
{
235
248
try {
249
+ if ($ this ->driverConnection ->getDatabasePlatform () instanceof MySQLPlatform) {
250
+ return $ this ->driverConnection ->update ($ this ->configuration ['table_name ' ], ['delivered_at ' => '9999-12-31 ' ], ['id ' => $ id ]) > 0 ;
251
+ }
252
+
236
253
return $ this ->driverConnection ->delete ($ this ->configuration ['table_name ' ], ['id ' => $ id ]) > 0 ;
237
254
} catch (DBALException |Exception $ exception ) {
238
255
throw new TransportException ($ exception ->getMessage (), 0 , $ exception );
@@ -388,6 +405,7 @@ private function getSchema(): Schema
388
405
$ table ->addColumn ('headers ' , self ::$ useDeprecatedConstants ? Type::TEXT : Types::TEXT )
389
406
->setNotnull (true );
390
407
$ table ->addColumn ('queue_name ' , self ::$ useDeprecatedConstants ? Type::STRING : Types::STRING )
408
+ ->setLength (190 ) // MySQL 5.6 only supports 191 characters on an indexed column in utf8mb4 mode
391
409
->setNotnull (true );
392
410
$ table ->addColumn ('created_at ' , self ::$ useDeprecatedConstants ? Type::DATETIME : Types::DATETIME_MUTABLE )
393
411
->setNotnull (true );
@@ -396,11 +414,8 @@ private function getSchema(): Schema
396
414
$ table ->addColumn ('delivered_at ' , self ::$ useDeprecatedConstants ? Type::DATETIME : Types::DATETIME_MUTABLE )
397
415
->setNotnull (false );
398
416
$ table ->setPrimaryKey (['id ' ]);
399
- // No indices on queue_name and available_at on MySQL to prevent deadlock issues when running multiple consumers.
400
- if (!$ this ->driverConnection ->getDatabasePlatform () instanceof MySQLPlatform) {
401
- $ table ->addIndex (['queue_name ' ]);
402
- $ table ->addIndex (['available_at ' ]);
403
- }
417
+ $ table ->addIndex (['queue_name ' ]);
418
+ $ table ->addIndex (['available_at ' ]);
404
419
$ table ->addIndex (['delivered_at ' ]);
405
420
406
421
return $ schema ;
0 commit comments