Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit f4054c1

Browse filesBrowse files
committed
Setup queues once in AMQP
1 parent 44e98db commit f4054c1
Copy full SHA for f4054c1

File tree

3 files changed

+44
-45
lines changed
Filter options

3 files changed

+44
-45
lines changed

‎src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ public static function fromDsn(string $dsn, array $options = [], HttpClientInter
118118
'wait_time' => (int) $options['wait_time'],
119119
'poll_timeout' => $options['poll_timeout'],
120120
'visibility_timeout' => $options['visibility_timeout'],
121-
'auto_setup' => (bool) $options['auto_setup'],
121+
'auto_setup' => filter_var($options['auto_setup'], \FILTER_VALIDATE_BOOLEAN),
122122
'queue_name' => (string) $options['queue_name'],
123123
];
124124

‎src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php
+18-12Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,24 @@ public function testItCanDisableTheSetup()
421421
$connection->publish('body');
422422
}
423423

424+
public function testItSetupQueuesOnce()
425+
{
426+
$factory = new TestAmqpFactory(
427+
$amqpConnection = $this->createMock(\AMQPConnection::class),
428+
$amqpChannel = $this->createMock(\AMQPChannel::class),
429+
$amqpQueue = $this->createMock(\AMQPQueue::class),
430+
$amqpExchange = $this->createMock(\AMQPExchange::class)
431+
);
432+
433+
$amqpExchange->expects($this->once())->method('declareExchange');
434+
$amqpQueue->expects($this->once())->method('declareQueue');
435+
$amqpQueue->expects($this->once())->method('bind');
436+
437+
$connection = Connection::fromDsn('amqp://localhost', ['auto_setup' => true], $factory);
438+
$connection->publish('body');
439+
$connection->publish('body');
440+
}
441+
424442
public function testSetChannelPrefetchWhenSetup()
425443
{
426444
$factory = new TestAmqpFactory(
@@ -453,15 +471,9 @@ public function testAutoSetupWithDelayDeclaresExchangeQueuesAndDelay()
453471
$delayQueue = $this->createMock(\AMQPQueue::class)
454472
));
455473
$factory->method('createExchange')->will($this->onConsecutiveCalls(
456-
$amqpExchange = $this->createMock(\AMQPExchange::class),
457474
$delayExchange = $this->createMock(\AMQPExchange::class)
458475
));
459476

460-
$amqpExchange->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
461-
$amqpExchange->expects($this->once())->method('declareExchange');
462-
$amqpQueue->expects($this->once())->method('setName')->with(self::DEFAULT_EXCHANGE_NAME);
463-
$amqpQueue->expects($this->once())->method('declareQueue');
464-
465477
$delayExchange->expects($this->once())->method('setName')->with('delays');
466478
$delayExchange->expects($this->once())->method('declareExchange');
467479
$delayExchange->expects($this->once())->method('publish');
@@ -479,11 +491,9 @@ public function testItDelaysTheMessage()
479491
$factory->method('createConnection')->willReturn($amqpConnection);
480492
$factory->method('createChannel')->willReturn($amqpChannel);
481493
$factory->method('createQueue')->will($this->onConsecutiveCalls(
482-
$this->createMock(\AMQPQueue::class),
483494
$delayQueue = $this->createMock(\AMQPQueue::class)
484495
));
485496
$factory->method('createExchange')->will($this->onConsecutiveCalls(
486-
$this->createMock(\AMQPExchange::class),
487497
$delayExchange = $this->createMock(\AMQPExchange::class)
488498
));
489499

@@ -513,11 +523,9 @@ public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs()
513523
$factory->method('createConnection')->willReturn($amqpConnection);
514524
$factory->method('createChannel')->willReturn($amqpChannel);
515525
$factory->method('createQueue')->will($this->onConsecutiveCalls(
516-
$this->createMock(\AMQPQueue::class),
517526
$delayQueue = $this->createMock(\AMQPQueue::class)
518527
));
519528
$factory->method('createExchange')->will($this->onConsecutiveCalls(
520-
$this->createMock(\AMQPExchange::class),
521529
$delayExchange = $this->createMock(\AMQPExchange::class)
522530
));
523531

@@ -653,11 +661,9 @@ public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument
653661
$factory->method('createConnection')->willReturn($amqpConnection);
654662
$factory->method('createChannel')->willReturn($amqpChannel);
655663
$factory->method('createQueue')->will($this->onConsecutiveCalls(
656-
$this->createMock(\AMQPQueue::class),
657664
$delayQueue = $this->createMock(\AMQPQueue::class)
658665
));
659666
$factory->method('createExchange')->will($this->onConsecutiveCalls(
660-
$this->createMock(\AMQPExchange::class),
661667
$delayExchange = $this->createMock(\AMQPExchange::class)
662668
));
663669

‎src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php
+25-32Lines changed: 25 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ class Connection
7979
private $exchangeOptions;
8080
private $queuesOptions;
8181
private $amqpFactory;
82+
private $autoSetup;
83+
private $autoSetupDelay;
8284

8385
/**
8486
* @var \AMQPChannel|null
@@ -112,6 +114,7 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar
112114
'queue_name_pattern' => 'delay_%exchange_name%_%routing_key%_%delay%',
113115
],
114116
], $connectionOptions);
117+
$this->autoSetup = $this->autoSetupDelay = $connectionOptions['auto_setup'] ?? true;
115118
$this->exchangeOptions = $exchangeOptions;
116119
$this->queuesOptions = $queuesOptions;
117120
$this->amqpFactory = $amqpFactory ?: new AmqpFactory();
@@ -207,6 +210,9 @@ public static function fromDsn(string $dsn, array $options = [], AmqpFactory $am
207210
$exchangeOptions = $amqpOptions['exchange'];
208211
$queuesOptions = $amqpOptions['queues'];
209212
unset($amqpOptions['queues'], $amqpOptions['exchange']);
213+
if (isset($amqpOptions['auto_setup'])) {
214+
$amqpOptions['auto_setup'] = filter_var($amqpOptions['auto_setup'], \FILTER_VALIDATE_BOOLEAN);
215+
}
210216

211217
$queuesOptions = array_map(function ($queueOptions) {
212218
if (!\is_array($queueOptions)) {
@@ -285,7 +291,7 @@ public function publish(string $body, array $headers = [], int $delayInMs = 0, A
285291
return;
286292
}
287293

288-
if ($this->shouldSetup()) {
294+
if ($this->autoSetup) {
289295
$this->setupExchangeAndQueues();
290296
}
291297

@@ -347,8 +353,8 @@ private function publishOnExchange(\AMQPExchange $exchange, string $body, string
347353

348354
private function setupDelay(int $delay, ?string $routingKey)
349355
{
350-
if ($this->shouldSetup()) {
351-
$this->setup(); // setup delay exchange and normal exchange for delay queue to DLX messages to
356+
if ($this->autoSetupDelay) {
357+
$this->setupDelayExchange(); // setup delay exchange and normal exchange for delay queue to DLX messages to
352358
}
353359

354360
$queue = $this->createDelayQueue($delay, $routingKey);
@@ -418,23 +424,12 @@ public function get(string $queueName): ?\AMQPEnvelope
418424
{
419425
$this->clearWhenDisconnected();
420426

421-
if ($this->shouldSetup()) {
427+
if ($this->autoSetup) {
422428
$this->setupExchangeAndQueues();
423429
}
424430

425-
try {
426-
if (false !== $message = $this->queue($queueName)->get()) {
427-
return $message;
428-
}
429-
} catch (\AMQPQueueException $e) {
430-
if (404 === $e->getCode() && $this->shouldSetup()) {
431-
// If we get a 404 for the queue, it means we need to set up the exchange & queue.
432-
$this->setupExchangeAndQueues();
433-
434-
return $this->get($queueName);
435-
}
436-
437-
throw $e;
431+
if (false !== $message = $this->queue($queueName)->get()) {
432+
return $message;
438433
}
439434

440435
return null;
@@ -452,8 +447,12 @@ public function nack(\AMQPEnvelope $message, string $queueName, int $flags = \AM
452447

453448
public function setup(): void
454449
{
455-
$this->setupExchangeAndQueues();
456-
$this->getDelayExchange()->declareExchange();
450+
if ($this->autoSetup) {
451+
$this->setupExchangeAndQueues();
452+
}
453+
if ($this->autoSetupDelay) {
454+
$this->setupDelayExchange();
455+
}
457456
}
458457

459458
private function setupExchangeAndQueues(): void
@@ -466,6 +465,13 @@ private function setupExchangeAndQueues(): void
466465
$this->queue($queueName)->bind($this->exchangeOptions['name'], $bindingKey, $queueConfig['binding_arguments'] ?? []);
467466
}
468467
}
468+
$this->autoSetup = false;
469+
}
470+
471+
private function setupDelayExchange(): void
472+
{
473+
$this->getDelayExchange()->declareExchange();
474+
$this->autoSetupDelay = false;
469475
}
470476

471477
/**
@@ -558,19 +564,6 @@ private function clearWhenDisconnected(): void
558564
}
559565
}
560566

561-
private function shouldSetup(): bool
562-
{
563-
if (!\array_key_exists('auto_setup', $this->connectionOptions)) {
564-
return true;
565-
}
566-
567-
if (\in_array($this->connectionOptions['auto_setup'], [false, 'false'], true)) {
568-
return false;
569-
}
570-
571-
return true;
572-
}
573-
574567
private function getDefaultPublishRoutingKey(): ?string
575568
{
576569
return $this->exchangeOptions['default_publish_routing_key'] ?? null;

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.