diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpSenderTest.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpSenderTest.php index b1dda969fb49b..60b130b317194 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpSenderTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpSenderTest.php @@ -13,11 +13,14 @@ use PHPUnit\Framework\TestCase; use Symfony\Component\Messenger\Bridge\Amqp\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpReceivedStamp; use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpSender; use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpStamp; use Symfony\Component\Messenger\Bridge\Amqp\Transport\Connection; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\TransportException; +use Symfony\Component\Messenger\Stamp\RedeliveryStamp; +use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; /** @@ -55,6 +58,48 @@ public function testItSendsTheEncodedMessageUsingARoutingKey() $sender->send($envelope); } + public function testItSendsTheEncodedMessageUsingARoutingKeyOnRetry() + { + $envelope = (new Envelope(new DummyMessage('Oy'))) + ->with($previousStamp = new AmqpStamp('rk')) + ->with(new AmqpReceivedStamp($amqpEnvelope = new \AMQPEnvelope(), 'queueName')) + ->with(new RedeliveryStamp(1)) + ; + $encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]]; + + $serializer = $this->createMock(SerializerInterface::class); + $serializer->method('encode')->with($envelope)->willReturn($encoded); + + $connection = $this->createMock(Connection::class); + $connection->expects($this->once())->method('publish')->with( + $encoded['body'], + $encoded['headers'], + 0, + AmqpStamp::createFromAmqpEnvelope($amqpEnvelope, $previousStamp, 'queueName') + ); + + $sender = new AmqpSender($connection, $serializer); + $sender->send($envelope); + } + + public function testItSendsTheEncodedMessageUsingARoutingKeyOnFailure() + { + $envelope = (new Envelope(new DummyMessage('Oy'))) + ->with($stamp = new AmqpStamp('rk')) + ->with(new SentToFailureTransportStamp('originalReceiverName')) + ; + $encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]]; + + $serializer = $this->createMock(SerializerInterface::class); + $serializer->method('encode')->with($envelope)->willReturn($encoded); + + $connection = $this->createMock(Connection::class); + $connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 0, $stamp); + + $sender = new AmqpSender($connection, $serializer); + $sender->send($envelope); + } + public function testItSendsTheEncodedMessageWithoutHeaders() { $envelope = new Envelope(new DummyMessage('Oy')); diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpSender.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpSender.php index 0e57671c662b1..2ca900ada72f0 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpSender.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpSender.php @@ -15,6 +15,7 @@ use Symfony\Component\Messenger\Exception\TransportException; use Symfony\Component\Messenger\Stamp\DelayStamp; use Symfony\Component\Messenger\Stamp\RedeliveryStamp; +use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp; use Symfony\Component\Messenger\Transport\Sender\SenderInterface; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface; @@ -56,10 +57,14 @@ public function send(Envelope $envelope): Envelope $amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class); if ($amqpReceivedStamp instanceof AmqpReceivedStamp) { + $isRetry = null === $envelope->last(SentToFailureTransportStamp::class) + && null !== $envelope->last(RedeliveryStamp::class) + ; + $amqpStamp = AmqpStamp::createFromAmqpEnvelope( $amqpReceivedStamp->getAmqpEnvelope(), $amqpStamp, - $envelope->last(RedeliveryStamp::class) ? $amqpReceivedStamp->getQueueName() : null + $isRetry ? $amqpReceivedStamp->getQueueName() : null ); } diff --git a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpStamp.php b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpStamp.php index 46e7818627224..aa311e615ec13 100644 --- a/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpStamp.php +++ b/src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpStamp.php @@ -11,13 +11,13 @@ namespace Symfony\Component\Messenger\Bridge\Amqp\Transport; -use Symfony\Component\Messenger\Stamp\NonSendableStampInterface; +use Symfony\Component\Messenger\Stamp\StampInterface; /** * @author Guillaume Gammelin * @author Samuel Roze */ -final class AmqpStamp implements NonSendableStampInterface +final class AmqpStamp implements StampInterface { private bool $isRetryAttempt = false;