Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 12b852f

Browse filesBrowse files
committed
bug #32052 [Messenger] fix AMQP delay queue to be per exchange (Tobion)
This PR was squashed before being merged into the 4.3 branch (closes #32052). Discussion ---------- [Messenger] fix AMQP delay queue to be per exchange | Q | A | ------------- | --- | Branch? | 4.3 | Bug fix? | yes | New feature? | no <!-- please update src/**/CHANGELOG.md files --> | BC breaks? | no <!-- see https://symfony.com/bc --> | Deprecations? | no <!-- please update UPGRADE-*.md and src/**/CHANGELOG.md files --> | Tests pass? | yes <!-- please add some, will be required by reviewers --> | Fixed tickets | #32050 | License | MIT | Doc PR | this makes the delay/retry work when having several exchanges or renaming your exchange. also the delay setup did not declare the target exchange. so if you only do delayed messages for a connection, auto-setup forgot to actually create the target exchange. Commits ------- 5bc3364 [Messenger] fix AMQP delay queue to be per exchange
2 parents 99c44a3 + 5bc3364 commit 12b852f
Copy full SHA for 12b852f

File tree

Expand file treeCollapse file tree

2 files changed

+70
-75
lines changed
Filter options
Expand file treeCollapse file tree

2 files changed

+70
-75
lines changed

‎src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php
+53-58Lines changed: 53 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
*/
2424
class ConnectionTest extends TestCase
2525
{
26+
private const DEFAULT_EXCHANGE_NAME = 'messages';
27+
2628
/**
2729
* @expectedException \InvalidArgumentException
2830
* @expectedExceptionMessage The given AMQP DSN "amqp://:" is invalid.
@@ -40,9 +42,9 @@ public function testItCanBeConstructedWithDefaults()
4042
'port' => 5672,
4143
'vhost' => '/',
4244
], [
43-
'name' => 'messages',
45+
'name' => self::DEFAULT_EXCHANGE_NAME,
4446
], [
45-
'messages' => [],
47+
self::DEFAULT_EXCHANGE_NAME => [],
4648
]),
4749
Connection::fromDsn('amqp://')
4850
);
@@ -196,7 +198,7 @@ public function testItUsesANormalConnectionByDefault()
196198
$amqpChannel->expects($this->once())->method('isConnected')->willReturn(true);
197199
$amqpConnection->expects($this->once())->method('connect');
198200

199-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], $factory);
201+
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
200202
$connection->publish('body');
201203
}
202204

@@ -213,7 +215,7 @@ public function testItAllowsToUseAPersistentConnection()
213215
$amqpChannel->expects($this->once())->method('isConnected')->willReturn(true);
214216
$amqpConnection->expects($this->once())->method('pconnect');
215217

216-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?persistent=true', [], $factory);
218+
$connection = Connection::fromDsn('amqp://localhost?persistent=true', [], $factory);
217219
$connection->publish('body');
218220
}
219221

@@ -226,13 +228,12 @@ public function testItSetupsTheConnectionWithDefaults()
226228
$amqpExchange = $this->createMock(\AMQPExchange::class)
227229
);
228230

229-
$amqpExchange->method('getName')->willReturn('exchange_name');
230231
$amqpExchange->expects($this->once())->method('declareExchange');
231232
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => []]);
232233
$amqpQueue->expects($this->once())->method('declareQueue');
233-
$amqpQueue->expects($this->once())->method('bind')->with('exchange_name', null);
234+
$amqpQueue->expects($this->once())->method('bind')->with(self::DEFAULT_EXCHANGE_NAME, null);
234235

235-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], $factory);
236+
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
236237
$connection->publish('body');
237238
}
238239

@@ -250,21 +251,20 @@ public function testItSetupsTheConnection()
250251
$factory->method('createExchange')->willReturn($amqpExchange);
251252
$factory->method('createQueue')->will($this->onConsecutiveCalls($amqpQueue0, $amqpQueue1));
252253

253-
$amqpExchange->method('getName')->willReturn('exchange_name');
254254
$amqpExchange->expects($this->once())->method('declareExchange');
255255
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key', AMQP_NOPARAM, ['headers' => []]);
256256
$amqpQueue0->expects($this->once())->method('declareQueue');
257257
$amqpQueue0->expects($this->exactly(2))->method('bind')->withConsecutive(
258-
['exchange_name', 'binding_key0'],
259-
['exchange_name', 'binding_key1']
258+
[self::DEFAULT_EXCHANGE_NAME, 'binding_key0'],
259+
[self::DEFAULT_EXCHANGE_NAME, 'binding_key1']
260260
);
261261
$amqpQueue1->expects($this->once())->method('declareQueue');
262262
$amqpQueue1->expects($this->exactly(2))->method('bind')->withConsecutive(
263-
['exchange_name', 'binding_key2'],
264-
['exchange_name', 'binding_key3']
263+
[self::DEFAULT_EXCHANGE_NAME, 'binding_key2'],
264+
[self::DEFAULT_EXCHANGE_NAME, 'binding_key3']
265265
);
266266

267-
$dsn = 'amqp://localhost/%2f/messages?'.
267+
$dsn = 'amqp://localhost?'.
268268
'exchange[default_publish_routing_key]=routing_key&'.
269269
'queues[queue0][binding_keys][0]=binding_key0&'.
270270
'queues[queue0][binding_keys][1]=binding_key1&'.
@@ -284,18 +284,17 @@ public function testItCanDisableTheSetup()
284284
$amqpExchange = $this->createMock(\AMQPExchange::class)
285285
);
286286

287-
$amqpExchange->method('getName')->willReturn('exchange_name');
288287
$amqpExchange->expects($this->never())->method('declareExchange');
289288
$amqpQueue->expects($this->never())->method('declareQueue');
290289
$amqpQueue->expects($this->never())->method('bind');
291290

292-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', ['auto_setup' => 'false'], $factory);
291+
$connection = Connection::fromDsn('amqp://localhost', ['auto_setup' => 'false'], $factory);
293292
$connection->publish('body');
294293

295-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', ['auto_setup' => false], $factory);
294+
$connection = Connection::fromDsn('amqp://localhost', ['auto_setup' => false], $factory);
296295
$connection->publish('body');
297296

298-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?auto_setup=false', [], $factory);
297+
$connection = Connection::fromDsn('amqp://localhost?auto_setup=false', [], $factory);
299298
$connection->publish('body');
300299
}
301300

@@ -312,9 +311,9 @@ public function testSetChannelPrefetchWhenSetup()
312311
$amqpChannel->expects($this->exactly(2))->method('isConnected')->willReturn(true);
313312

314313
$amqpChannel->expects($this->exactly(2))->method('setPrefetchCount')->with(2);
315-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?prefetch_count=2', [], $factory);
314+
$connection = Connection::fromDsn('amqp://localhost?prefetch_count=2', [], $factory);
316315
$connection->setup();
317-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', ['prefetch_count' => 2], $factory);
316+
$connection = Connection::fromDsn('amqp://localhost', ['prefetch_count' => 2], $factory);
318317
$connection->setup();
319318
}
320319

@@ -329,29 +328,29 @@ public function testItDelaysTheMessage()
329328
$factory->method('createChannel')->willReturn($amqpChannel);
330329
$factory->method('createQueue')->willReturn($delayQueue);
331330
$factory->method('createExchange')->will($this->onConsecutiveCalls(
332-
$delayExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
333-
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
331+
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
332+
$delayExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
334333
));
335334

336-
$amqpExchange->expects($this->once())->method('setName')->with('messages');
337-
$amqpExchange->method('getName')->willReturn('messages');
335+
$amqpExchange->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
336+
$amqpExchange->expects($this->once())->method('declareExchange');
338337

339338
$delayExchange->expects($this->once())->method('setName')->with('delay');
340339
$delayExchange->expects($this->once())->method('declareExchange');
341-
$delayExchange->method('getName')->willReturn('delay');
342340

343-
$delayQueue->expects($this->once())->method('setName')->with('delay_queue__5000');
341+
$delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages__5000');
344342
$delayQueue->expects($this->once())->method('setArguments')->with([
345343
'x-message-ttl' => 5000,
346-
'x-dead-letter-exchange' => 'messages',
344+
'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME,
345+
'x-dead-letter-routing-key' => '',
347346
]);
348347

349348
$delayQueue->expects($this->once())->method('declareQueue');
350-
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay__5000');
349+
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_messages__5000');
351350

352-
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay__5000', AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo']]);
351+
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__5000', AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo']]);
353352

354-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], $factory);
353+
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
355354
$connection->publish('{}', ['x-some-headers' => 'foo'], 5000);
356355
}
357356

@@ -366,41 +365,41 @@ public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs()
366365
$factory->method('createChannel')->willReturn($amqpChannel);
367366
$factory->method('createQueue')->willReturn($delayQueue);
368367
$factory->method('createExchange')->will($this->onConsecutiveCalls(
369-
$delayExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
370-
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
368+
$amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(),
369+
$delayExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock()
371370
));
372371

373-
$amqpExchange->expects($this->once())->method('setName')->with('messages');
374-
$amqpExchange->method('getName')->willReturn('messages');
372+
$amqpExchange->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
373+
$amqpExchange->expects($this->once())->method('declareExchange');
375374

376375
$delayExchange->expects($this->once())->method('setName')->with('delay');
377376
$delayExchange->expects($this->once())->method('declareExchange');
378-
$delayExchange->method('getName')->willReturn('delay');
379377

380378
$connectionOptions = [
381379
'retry' => [
382380
'dead_routing_key' => 'my_dead_routing_key',
383381
],
384382
];
385383

386-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', $connectionOptions, $factory);
384+
$connection = Connection::fromDsn('amqp://localhost', $connectionOptions, $factory);
387385

388-
$delayQueue->expects($this->once())->method('setName')->with('delay_queue__120000');
386+
$delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages__120000');
389387
$delayQueue->expects($this->once())->method('setArguments')->with([
390388
'x-message-ttl' => 120000,
391-
'x-dead-letter-exchange' => 'messages',
389+
'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME,
390+
'x-dead-letter-routing-key' => '',
392391
]);
393392

394393
$delayQueue->expects($this->once())->method('declareQueue');
395-
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay__120000');
394+
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_messages__120000');
396395

397-
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay__120000', AMQP_NOPARAM, ['headers' => []]);
396+
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000', AMQP_NOPARAM, ['headers' => []]);
398397
$connection->publish('{}', [], 120000);
399398
}
400399

401400
/**
402401
* @expectedException \AMQPException
403-
* @expectedExceptionMessage Could not connect to the AMQP server. Please verify the provided DSN. ({"delay":{"routing_key_pattern":"delay_%routing_key%_%delay%","exchange_name":"delay","queue_name_pattern":"delay_queue_%routing_key%_%delay%"},"host":"localhost","port":5672,"vhost":"\/","login":"user","password":"********"})
402+
* @expectedExceptionMessage Could not connect to the AMQP server. Please verify the provided DSN. ({"host":"localhost","port":5672,"vhost":"\/","login":"user","password":"********"})
404403
*/
405404
public function testObfuscatePasswordInDsn()
406405
{
@@ -415,7 +414,7 @@ public function testObfuscatePasswordInDsn()
415414
new \AMQPConnectionException('Oups.')
416415
);
417416

418-
$connection = Connection::fromDsn('amqp://user:secretpassword@localhost/%2f/messages', [], $factory);
417+
$connection = Connection::fromDsn('amqp://user:secretpassword@localhost', [], $factory);
419418
$connection->channel();
420419
}
421420

@@ -430,7 +429,7 @@ public function testItCanPublishWithTheDefaultRoutingKey()
430429

431430
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key');
432431

433-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[default_publish_routing_key]=routing_key', [], $factory);
432+
$connection = Connection::fromDsn('amqp://localhost?exchange[default_publish_routing_key]=routing_key', [], $factory);
434433
$connection->publish('body');
435434
}
436435

@@ -445,7 +444,7 @@ public function testItCanPublishWithASuppliedRoutingKey()
445444

446445
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key');
447446

448-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[default_publish_routing_key]=default_routing_key', [], $factory);
447+
$connection = Connection::fromDsn('amqp://localhost?exchange[default_publish_routing_key]=default_routing_key', [], $factory);
449448
$connection->publish('body', [], 0, new AmqpStamp('routing_key'));
450449
}
451450

@@ -460,39 +459,35 @@ public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument
460459
$factory->method('createChannel')->willReturn($amqpChannel);
461460
$factory->method('createQueue')->willReturn($delayQueue);
462461
$factory->method('createExchange')->will($this->onConsecutiveCalls(
463-
$delayExchange = $this->createMock(\AMQPExchange::class),
464-
$amqpExchange = $this->createMock(\AMQPExchange::class)
462+
$amqpExchange = $this->createMock(\AMQPExchange::class),
463+
$delayExchange = $this->createMock(\AMQPExchange::class)
465464
));
466465

467-
$amqpExchange->expects($this->once())->method('setName')->with('messages');
468-
$amqpExchange->method('getName')->willReturn('messages');
466+
$amqpExchange->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
467+
$amqpExchange->expects($this->once())->method('declareExchange');
469468

470469
$delayExchange->expects($this->once())->method('setName')->with('delay');
471470
$delayExchange->expects($this->once())->method('declareExchange');
472-
$delayExchange->method('getName')->willReturn('delay');
473471

474472
$connectionOptions = [
475473
'retry' => [
476474
'dead_routing_key' => 'my_dead_routing_key',
477475
],
478476
];
479477

480-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', $connectionOptions, $factory);
478+
$connection = Connection::fromDsn('amqp://localhost', $connectionOptions, $factory);
481479

482-
$delayQueue->expects($this->once())->method('setName')->with('delay_queue_routing_key_120000');
480+
$delayQueue->expects($this->once())->method('setName')->with('delay_queue_messages_routing_key_120000');
483481
$delayQueue->expects($this->once())->method('setArguments')->with([
484482
'x-message-ttl' => 120000,
485-
'x-dead-letter-exchange' => 'messages',
483+
'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME,
484+
'x-dead-letter-routing-key' => 'routing_key',
486485
]);
487-
$delayQueue->expects($this->once())->method('setArgument')->with(
488-
'x-dead-letter-routing-key',
489-
'routing_key'
490-
);
491486

492487
$delayQueue->expects($this->once())->method('declareQueue');
493-
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_routing_key_120000');
488+
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_messages_routing_key_120000');
494489

495-
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_routing_key_120000', AMQP_NOPARAM, ['headers' => []]);
490+
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000', AMQP_NOPARAM, ['headers' => []]);
496491
$connection->publish('{}', [], 120000, new AmqpStamp('routing_key'));
497492
}
498493

@@ -512,7 +507,7 @@ public function testItCanPublishWithCustomFlagsAndAttributes()
512507
['delivery_mode' => 2, 'headers' => ['type' => DummyMessage::class]]
513508
);
514509

515-
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], $factory);
510+
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
516511
$connection->publish('body', ['type' => DummyMessage::class], 0, new AmqpStamp('routing_key', AMQP_IMMEDIATE, ['delivery_mode' => 2]));
517512
}
518513
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.