From e92b4ad8607f517743d451a44cba441d5c8ae9f9 Mon Sep 17 00:00:00 2001 From: Samuel ROZE Date: Sun, 22 Apr 2018 20:58:02 +0100 Subject: [PATCH 1/2] Add a retry mechanism for AMQP messages Add some tests and remove the intrication between the receiver and the connection Configure each TTL individually and the dead routing key The `ttls` array is 0-indexed Update the retry based on feedback (`ttls` -> `ttl`, add options to documentation and normalises the default values) Catches failed retries and forward other messages' attributes --- .../Resources/config/messenger.xml | 1 + .../Transport/AmqpExt/AmqpReceiverTest.php | 56 +++++- .../AmqpExt/AmqpTransportFactoryTest.php | 2 +- .../Transport/AmqpExt/ConnectionTest.php | 92 ++++++++++ .../Transport/AmqpExt/AmqpReceiver.php | 33 +++- .../Transport/AmqpExt/AmqpTransport.php | 7 +- .../AmqpExt/AmqpTransportFactory.php | 7 +- .../Transport/AmqpExt/Connection.php | 162 +++++++++++++++++- 8 files changed, 342 insertions(+), 18 deletions(-) diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml index 0b09978d33870..5e3b5a6db2d89 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml @@ -64,6 +64,7 @@ %kernel.debug% + diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php index 5666598337655..0508670503547 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php @@ -69,7 +69,7 @@ public function testItNonAcknowledgeTheMessageIfAnExceptionHappened() $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); $connection->method('get')->willReturn($envelope); - $connection->expects($this->once())->method('nack')->with($envelope); + $connection->expects($this->once())->method('nack')->with($envelope, AMQP_REQUEUE); $receiver = new AmqpReceiver($serializer, $connection); $receiver->receive(function () { @@ -101,6 +101,60 @@ public function testItRejectsTheMessageIfTheExceptionIsARejectMessageExceptionIn throw new WillNeverWorkException('Well...'); }); } + + public function testItPublishesTheMessageForRetryIfSuchConfiguration() + { + $serializer = new Serializer( + new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder())) + ); + + $envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock(); + $envelope->method('getBody')->willReturn('{"message": "Hi"}'); + $envelope->method('getHeaders')->willReturn(array( + 'type' => DummyMessage::class, + )); + + $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); + $connection->method('get')->willReturn($envelope); + $connection->method('getConnectionConfiguration')->willReturn(array('retry' => array('attempts' => 3))); + $connection->method('publishForRetry')->with($envelope)->willReturn(true); + + $connection->expects($this->once())->method('ack')->with($envelope); + + $receiver = new AmqpReceiver($serializer, $connection); + $receiver->receive(function (Envelope $envelope) use ($receiver) { + $this->assertEquals(new DummyMessage('Hi'), $envelope->getMessage()); + $receiver->stop(); + }); + } + + /** + * @expectedException \Symfony\Component\Messenger\Tests\Transport\AmqpExt\InterruptException + */ + public function testItThrowsTheExceptionIfTheRetryPublishDidNotWork() + { + $serializer = new Serializer( + new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder())) + ); + + $envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock(); + $envelope->method('getBody')->willReturn('{"message": "Hi"}'); + $envelope->method('getHeaders')->willReturn(array( + 'type' => DummyMessage::class, + )); + + $connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock(); + $connection->method('get')->willReturn($envelope); + $connection->method('getConnectionConfiguration')->willReturn(array('retry' => array('attempts' => 3))); + $connection->method('publishForRetry')->with($envelope)->willReturn(false); + + $connection->expects($this->never())->method('ack')->with($envelope); + + $receiver = new AmqpReceiver($serializer, $connection); + $receiver->receive(function () { + throw new InterruptException('Well...'); + }); + } } class InterruptException extends \Exception diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpTransportFactoryTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpTransportFactoryTest.php index 53a98e2263a07..0e2396f63af1b 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpTransportFactoryTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpTransportFactoryTest.php @@ -41,7 +41,7 @@ public function testItCreatesTheTransport() true ); - $expectedTransport = new AmqpTransport($encoder, $decoder, Connection::fromDsn('amqp://localhost', array('foo' => 'bar'), true), array('foo' => 'bar'), true); + $expectedTransport = new AmqpTransport($encoder, $decoder, Connection::fromDsn('amqp://localhost', array('foo' => 'bar'), true)); $this->assertEquals($expectedTransport, $factory->createTransport('amqp://localhost', array('foo' => 'bar'))); } diff --git a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php index 511f8fe3c4414..56b809dd364ea 100644 --- a/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php @@ -189,6 +189,98 @@ public function testItCanDisableTheSetup() $connection = Connection::fromDsn('amqp://localhost/%2f/messages?queue[routing_key]=my_key&auto-setup=false', array(), true, $factory); $connection->publish('body'); } + + public function testItRetriesTheMessage() + { + $amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock(); + $amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock(); + $retryQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock(); + + $factory = $this->getMockBuilder(AmqpFactory::class)->getMock(); + $factory->method('createConnection')->willReturn($amqpConnection); + $factory->method('createChannel')->willReturn($amqpChannel); + $factory->method('createQueue')->willReturn($retryQueue); + $factory->method('createExchange')->will($this->onConsecutiveCalls( + $retryExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(), + $amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock() + )); + + $amqpExchange->expects($this->once())->method('setName')->with('messages'); + $amqpExchange->method('getName')->willReturn('messages'); + + $retryExchange->expects($this->once())->method('setName')->with('retry'); + $retryExchange->expects($this->once())->method('declareExchange'); + $retryExchange->method('getName')->willReturn('retry'); + + $retryQueue->expects($this->once())->method('setName')->with('retry_queue_1'); + $retryQueue->expects($this->once())->method('setArguments')->with(array( + 'x-message-ttl' => 10000, + 'x-dead-letter-exchange' => 'messages', + )); + + $retryQueue->expects($this->once())->method('declareQueue'); + $retryQueue->expects($this->once())->method('bind')->with('retry', 'attempt_1'); + + $envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock(); + $envelope->method('getHeader')->with('symfony-messenger-attempts')->willReturn(false); + $envelope->method('getHeaders')->willReturn(array('x-some-headers' => 'foo')); + $envelope->method('getBody')->willReturn('{}'); + + $retryExchange->expects($this->once())->method('publish')->with('{}', 'attempt_1', AMQP_NOPARAM, array('headers' => array('x-some-headers' => 'foo', 'symfony-messenger-attempts' => 1))); + + $connection = Connection::fromDsn('amqp://localhost/%2f/messages', array('retry' => array('attempts' => 3)), false, $factory); + $connection->publishForRetry($envelope); + } + + public function testItRetriesTheMessageWithADifferentRoutingKeyAndTTLs() + { + $amqpConnection = $this->getMockBuilder(\AMQPConnection::class)->disableOriginalConstructor()->getMock(); + $amqpChannel = $this->getMockBuilder(\AMQPChannel::class)->disableOriginalConstructor()->getMock(); + $retryQueue = $this->getMockBuilder(\AMQPQueue::class)->disableOriginalConstructor()->getMock(); + + $factory = $this->getMockBuilder(AmqpFactory::class)->getMock(); + $factory->method('createConnection')->willReturn($amqpConnection); + $factory->method('createChannel')->willReturn($amqpChannel); + $factory->method('createQueue')->willReturn($retryQueue); + $factory->method('createExchange')->will($this->onConsecutiveCalls( + $retryExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock(), + $amqpExchange = $this->getMockBuilder(\AMQPExchange::class)->disableOriginalConstructor()->getMock() + )); + + $amqpExchange->expects($this->once())->method('setName')->with('messages'); + $amqpExchange->method('getName')->willReturn('messages'); + + $retryExchange->expects($this->once())->method('setName')->with('retry'); + $retryExchange->expects($this->once())->method('declareExchange'); + $retryExchange->method('getName')->willReturn('retry'); + + $connectionOptions = array( + 'retry' => array( + 'attempts' => 3, + 'dead_routing_key' => 'my_dead_routing_key', + 'ttl' => array(30000, 60000, 120000), + ), + ); + + $connection = Connection::fromDsn('amqp://localhost/%2f/messages', $connectionOptions, false, $factory); + + $messageRetriedTwice = $this->getMockBuilder(\AMQPEnvelope::class)->getMock(); + $messageRetriedTwice->method('getHeader')->with('symfony-messenger-attempts')->willReturn('2'); + $messageRetriedTwice->method('getHeaders')->willReturn(array('symfony-messenger-attempts' => '2')); + $messageRetriedTwice->method('getBody')->willReturn('{}'); + + $retryQueue->expects($this->once())->method('setName')->with('retry_queue_3'); + $retryQueue->expects($this->once())->method('setArguments')->with(array( + 'x-message-ttl' => 120000, + 'x-dead-letter-exchange' => 'messages', + )); + + $retryQueue->expects($this->once())->method('declareQueue'); + $retryQueue->expects($this->once())->method('bind')->with('retry', 'attempt_3'); + + $retryExchange->expects($this->once())->method('publish')->with('{}', 'attempt_3', AMQP_NOPARAM, array('headers' => array('symfony-messenger-attempts' => 3))); + $connection->publishForRetry($messageRetriedTwice); + } } class TestAmqpFactory extends AmqpFactory diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php index 0e6fbff8ee340..4c88e6166b845 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php @@ -11,6 +11,7 @@ namespace Symfony\Component\Messenger\Transport\AmqpExt; +use Psr\Log\LoggerInterface; use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RejectMessageExceptionInterface; use Symfony\Component\Messenger\Transport\ReceiverInterface; use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface; @@ -22,14 +23,18 @@ */ class AmqpReceiver implements ReceiverInterface { + private const DEFAULT_LOOP_SLEEP_IN_MICRO_SECONDS = 200000; + private $decoder; private $connection; + private $logger; private $shouldStop; - public function __construct(DecoderInterface $decoder, Connection $connection) + public function __construct(DecoderInterface $decoder, Connection $connection, LoggerInterface $logger = null) { $this->decoder = $decoder; $this->connection = $connection; + $this->logger = $logger; } /** @@ -39,10 +44,11 @@ public function receive(callable $handler): void { while (!$this->shouldStop) { $AMQPEnvelope = $this->connection->get(); + if (null === $AMQPEnvelope) { $handler(null); - usleep($this->connection->getConnectionCredentials()['loop_sleep'] ?? 200000); + usleep($this->connection->getConnectionConfiguration()['loop_sleep'] ?? self::DEFAULT_LOOP_SLEEP_IN_MICRO_SECONDS); if (\function_exists('pcntl_signal_dispatch')) { pcntl_signal_dispatch(); } @@ -62,9 +68,25 @@ public function receive(callable $handler): void throw $e; } catch (\Throwable $e) { - $this->connection->nack($AMQPEnvelope, AMQP_REQUEUE); + try { + $retried = $this->connection->publishForRetry($AMQPEnvelope); + } catch (\Throwable $retryException) { + $this->logger && $this->logger->warning(sprintf('Retrying message #%s failed. Requeuing it now.', $AMQPEnvelope->getMessageId()), array( + 'retryException' => $retryException, + 'exception' => $e, + )); - throw $e; + $retried = false; + } + + if (!$retried) { + $this->connection->nack($AMQPEnvelope, AMQP_REQUEUE); + + throw $e; + } + + // Acknowledge current message as another one as been requeued. + $this->connection->ack($AMQPEnvelope); } finally { if (\function_exists('pcntl_signal_dispatch')) { pcntl_signal_dispatch(); @@ -73,6 +95,9 @@ public function receive(callable $handler): void } } + /** + * {@inheritdoc} + */ public function stop(): void { $this->shouldStop = true; diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php index 3edefd0ab1c8a..d97dce3a3a23d 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php @@ -11,6 +11,7 @@ namespace Symfony\Component\Messenger\Transport\AmqpExt; +use Psr\Log\LoggerInterface; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface; use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface; @@ -26,12 +27,14 @@ class AmqpTransport implements TransportInterface private $connection; private $receiver; private $sender; + private $logger; - public function __construct(EncoderInterface $encoder, DecoderInterface $decoder, Connection $connection) + public function __construct(EncoderInterface $encoder, DecoderInterface $decoder, Connection $connection, LoggerInterface $logger = null) { $this->encoder = $encoder; $this->decoder = $decoder; $this->connection = $connection; + $this->logger = $logger; } /** @@ -60,7 +63,7 @@ public function send(Envelope $envelope): void private function getReceiver() { - return $this->receiver = new AmqpReceiver($this->decoder, $this->connection); + return $this->receiver = new AmqpReceiver($this->decoder, $this->connection, $this->logger); } private function getSender() diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransportFactory.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransportFactory.php index 29fb4ae4aa3e0..88f547285f97a 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransportFactory.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransportFactory.php @@ -11,6 +11,7 @@ namespace Symfony\Component\Messenger\Transport\AmqpExt; +use Psr\Log\LoggerInterface; use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface; use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface; use Symfony\Component\Messenger\Transport\TransportFactoryInterface; @@ -24,17 +25,19 @@ class AmqpTransportFactory implements TransportFactoryInterface private $encoder; private $decoder; private $debug; + private $logger; - public function __construct(EncoderInterface $encoder, DecoderInterface $decoder, bool $debug) + public function __construct(EncoderInterface $encoder, DecoderInterface $decoder, bool $debug, LoggerInterface $logger = null) { $this->encoder = $encoder; $this->decoder = $decoder; $this->debug = $debug; + $this->logger = $logger; } public function createTransport(string $dsn, array $options): TransportInterface { - return new AmqpTransport($this->encoder, $this->decoder, Connection::fromDsn($dsn, $options, $this->debug)); + return new AmqpTransport($this->encoder, $this->decoder, Connection::fromDsn($dsn, $options, $this->debug), $this->logger); } public function supports(string $dsn, array $options): bool diff --git a/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php b/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php index 6f50bd76d5e65..0c1baf40be68b 100644 --- a/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php +++ b/src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php @@ -18,7 +18,10 @@ */ class Connection { - private $connectionCredentials; + private const DEFAULT_MESSAGE_TTL_IN_MILLI_SECONDS = 10000; + private const DEFAULT_MAX_ATTEMPTS = 3; + + private $connectionConfiguration; private $exchangeConfiguration; private $queueConfiguration; private $debug; @@ -39,9 +42,50 @@ class Connection */ private $amqpQueue; - public function __construct(array $connectionCredentials, array $exchangeConfiguration, array $queueConfiguration, bool $debug = false, AmqpFactory $amqpFactory = null) + /** + * @var \AMQPExchange|null + */ + private $amqpRetryExchange; + + /** + * Available options: + * + * * host: Hostname of the AMQP service + * * port: Port of the AMQP service + * * vhost: Virtual Host to use with the AMQP service + * * user: Username to use to connect the the AMQP service + * * password: Username to use the connect to the AMQP service + * * queue: + * * name: Name of the queue + * * routing_key: The routing key (if any) to use to push the messages to + * * flags: Queue flags (Default: AMQP_DURABLE) + * * arguments: Extra arguments + * * exchange: + * * name: Name of the exchange + * * type: Type of exchange (Default: fanout) + * * flags: Exchange flags (Default: AMQP_DURABLE) + * * arguments: Extra arguments + * * retry: + * * attempts: Number of times it will try to retry + * * routing_key_pattern: The pattern of the routing key (Default: "attempt_%attempt%") + * * dead_queue: Name of the queue in which messages that retry more than attempts time are pushed to + * * dead_routing_key: Routing key name for the dead queue (Default: "dead") + * * queue_name_pattern: Pattern to use to create the queues (Default: "retry_queue_%attempt%") + * * exchange_name: Name of the exchange to be used for the retried messages (Default: "retry") + * * ttl: Key-value pairs of attempt number -> seconds to wait. If not configured, 10 seconds will be waited each attempt. + * * auto-setup: Enable or not the auto-setup of queues and exchanges (Default: true) + * * loop_sleep: Amount of micro-seconds to wait if no message are available (Default: 200000) + */ + public function __construct(array $connectionConfiguration, array $exchangeConfiguration, array $queueConfiguration, bool $debug = false, AmqpFactory $amqpFactory = null) { - $this->connectionCredentials = $connectionCredentials; + $this->connectionConfiguration = array_replace_recursive(array( + 'retry' => array( + 'routing_key_pattern' => 'attempt_%attempt%', + 'dead_routing_key' => 'dead', + 'exchange_name' => 'retry', + 'queue_name_pattern' => 'retry_queue_%attempt%', + ), + ), $connectionConfiguration); $this->debug = $debug; $this->exchangeConfiguration = $exchangeConfiguration; $this->queueConfiguration = $queueConfiguration; @@ -101,6 +145,108 @@ public function publish(string $body, array $headers = array()): void $this->exchange()->publish($body, null, AMQP_NOPARAM, array('headers' => $headers)); } + /** + * @throws \AMQPException + */ + public function publishForRetry(\AMQPEnvelope $message): bool + { + if (!isset($this->connectionConfiguration['retry'])) { + return false; + } + + $retryConfiguration = $this->connectionConfiguration['retry']; + $attemptNumber = ((int) $message->getHeader('symfony-messenger-attempts') ?: 0) + 1; + + if ($this->shouldSetup()) { + $this->setupRetry($retryConfiguration, $attemptNumber); + } + + $maximumAttempts = $retryConfiguration['attempts'] ?? self::DEFAULT_MAX_ATTEMPTS; + $routingKey = str_replace('%attempt%', $attemptNumber, $retryConfiguration['routing_key_pattern']); + + if ($attemptNumber > $maximumAttempts) { + if (!isset($retryConfiguration['dead_queue'])) { + return false; + } + + $routingKey = $retryConfiguration['dead_routing_key']; + } + + $retriedMessageAttributes = array( + 'headers' => array_merge($message->getHeaders(), array('symfony-messenger-attempts' => (string) $attemptNumber)), + ); + + if ($deliveryMode = $message->getDeliveryMode()) { + $retriedMessageAttributes['delivery_mode'] = $deliveryMode; + } + if ($userId = $message->getUserId()) { + $retriedMessageAttributes['user_id'] = $userId; + } + if (null !== $priority = $message->getPriority()) { + $retriedMessageAttributes['priority'] = $priority; + } + if ($replyTo = $message->getReplyTo()) { + $retriedMessageAttributes['reply_to'] = $replyTo; + } + + $this->retryExchange($retryConfiguration)->publish( + $message->getBody(), + $routingKey, + AMQP_NOPARAM, + $retriedMessageAttributes + ); + + return true; + } + + private function setupRetry(array $retryConfiguration, int $attemptNumber) + { + if (!$this->channel()->isConnected()) { + $this->clear(); + } + + $exchange = $this->retryExchange($retryConfiguration); + $exchange->declareExchange(); + + $queue = $this->retryQueue($retryConfiguration, $attemptNumber); + $queue->declareQueue(); + $queue->bind($exchange->getName(), str_replace('%attempt%', $attemptNumber, $retryConfiguration['routing_key_pattern'])); + + if (isset($retryConfiguration['dead_queue'])) { + $queue = $this->amqpFactory->createQueue($this->channel()); + $queue->setName($retryConfiguration['dead_queue']); + $queue->declareQueue(); + $queue->bind($exchange->getName(), $retryConfiguration['dead_routing_key']); + } + } + + private function retryExchange(array $retryConfiguration): \AMQPExchange + { + if (null === $this->amqpRetryExchange) { + $this->amqpRetryExchange = $this->amqpFactory->createExchange($this->channel()); + $this->amqpRetryExchange->setName($retryConfiguration['exchange_name']); + $this->amqpRetryExchange->setType(AMQP_EX_TYPE_DIRECT); + } + + return $this->amqpRetryExchange; + } + + private function retryQueue(array $retryConfiguration, int $attemptNumber) + { + $queue = $this->amqpFactory->createQueue($this->channel()); + $queue->setName(str_replace('%attempt%', $attemptNumber, $retryConfiguration['queue_name_pattern'])); + $queue->setArguments(array( + 'x-message-ttl' => $retryConfiguration['ttl'][$attemptNumber - 1] ?? self::DEFAULT_MESSAGE_TTL_IN_MILLI_SECONDS, + 'x-dead-letter-exchange' => $this->exchange()->getName(), + )); + + if (isset($this->queueConfiguration['routing_key'])) { + $queue->setArgument('x-dead-letter-routing-key', $this->queueConfiguration['routing_key']); + } + + return $queue; + } + /** * Waits and gets a message from the configured queue. * @@ -160,8 +306,8 @@ public function setup(): void public function channel(): \AMQPChannel { if (null === $this->amqpChannel) { - $connection = $this->amqpFactory->createConnection($this->connectionCredentials); - $connectMethod = 'true' === ($this->connectionCredentials['persistent'] ?? 'false') ? 'pconnect' : 'connect'; + $connection = $this->amqpFactory->createConnection($this->connectionConfiguration); + $connectMethod = 'true' === ($this->connectionConfiguration['persistent'] ?? 'false') ? 'pconnect' : 'connect'; if (false === $connection->{$connectMethod}()) { throw new \AMQPException('Could not connect to the AMQP server. Please verify the provided DSN.'); @@ -204,9 +350,9 @@ public function exchange(): \AMQPExchange return $this->amqpExchange; } - public function getConnectionCredentials(): array + public function getConnectionConfiguration(): array { - return $this->connectionCredentials; + return $this->connectionConfiguration; } private function clear(): void @@ -218,6 +364,6 @@ private function clear(): void private function shouldSetup(): bool { - return !array_key_exists('auto-setup', $this->connectionCredentials) || !\in_array($this->connectionCredentials['auto-setup'], array(false, 'false'), true); + return !array_key_exists('auto-setup', $this->connectionConfiguration) || !\in_array($this->connectionConfiguration['auto-setup'], array(false, 'false'), true); } } From d32d7b6f4b26c4378deb2eeb95f8f01ac55de54c Mon Sep 17 00:00:00 2001 From: Samuel ROZE Date: Sun, 12 Aug 2018 18:31:35 +0100 Subject: [PATCH 2/2] Add the method name change in the CHANGELOG --- UPGRADE-4.2.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/UPGRADE-4.2.md b/UPGRADE-4.2.md index 670fe3c7cfc70..3d2ae32351735 100644 --- a/UPGRADE-4.2.md +++ b/UPGRADE-4.2.md @@ -88,7 +88,9 @@ Messenger --------- * The `handle` method of the `Symfony\Component\Messenger\Middleware\ValidationMiddleware` and `Symfony\Component\Messenger\Asynchronous\Middleware\SendMessageMiddleware` middlewares now requires an `Envelope` object to be given (because they implement the `EnvelopeAwareInterface`). When using these middleware with the provided `MessageBus`, you will not have to do anything. If you use the middlewares any other way, you can use `Envelope::wrap($message)` to create an envelope for your message. - + * The method `getConnectionCredentials` of the AMQP transport class `Symfony\Component\Messenger\Transport\AmqpExt\Connection` + has been renamed to `getConnectionConfiguration`. + Security --------