Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 4e1b8fc

Browse filesBrowse files
author
Fabien Perroquin
committed
[Messenger][Amqp] Fix wrong routing key when use failure queue
1 parent 39f4f1a commit 4e1b8fc
Copy full SHA for 4e1b8fc

File tree

4 files changed

+33
-2
lines changed
Filter options

4 files changed

+33
-2
lines changed

‎src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php
+17Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -804,6 +804,23 @@ private function createDelayOrRetryConnection(\AMQPExchange $delayExchange, stri
804804

805805
return Connection::fromDsn('amqp://localhost', [], $factory);
806806
}
807+
808+
public function testForcePublishWithDefaultRoutingKey()
809+
{
810+
$factory = new TestAmqpFactory(
811+
$amqpConnection = $this->createMock(\AMQPConnection::class),
812+
$amqpChannel = $this->createMock(\AMQPChannel::class),
813+
$amqpQueue = $this->createMock(\AMQPQueue::class),
814+
$amqpExchange = $this->createMock(\AMQPExchange::class)
815+
);
816+
817+
$amqpExchange->expects($this->once())->method('publish')->with('body', 'default_routing_key');
818+
$amqpEnvelope = $this->createMock(\AMQPEnvelope::class);
819+
$amqpStamp = AmqpStamp::createFromAmqpEnvelope($amqpEnvelope, null, '', true);
820+
821+
$connection = Connection::fromDsn('amqp://localhost?exchange[default_publish_routing_key]=default_routing_key', [], $factory);
822+
$connection->publish('body', [], 0, $amqpStamp);
823+
}
807824
}
808825

809826
class TestAmqpFactory extends AmqpFactory

‎src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpSender.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpSender.php
+3-1Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use Symfony\Component\Messenger\Exception\TransportException;
1616
use Symfony\Component\Messenger\Stamp\DelayStamp;
1717
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
18+
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
1819
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
1920
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
2021
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -59,7 +60,8 @@ public function send(Envelope $envelope): Envelope
5960
$amqpStamp = AmqpStamp::createFromAmqpEnvelope(
6061
$amqpReceivedStamp->getAmqpEnvelope(),
6162
$amqpStamp,
62-
$envelope->last(RedeliveryStamp::class) ? $amqpReceivedStamp->getQueueName() : null
63+
$envelope->last(RedeliveryStamp::class) ? $amqpReceivedStamp->getQueueName() : null,
64+
$envelope->last(SentToFailureTransportStamp::class) ? true : false
6365
);
6466
}
6567

‎src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpStamp.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpStamp.php
+9-1Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ final class AmqpStamp implements NonSendableStampInterface
2323
private int $flags;
2424
private array $attributes;
2525
private bool $isRetryAttempt = false;
26+
private bool $forceDefaultRoutingKey = false;
2627

2728
public function __construct(string $routingKey = null, int $flags = \AMQP_NOPARAM, array $attributes = [])
2829
{
@@ -46,7 +47,7 @@ public function getAttributes(): array
4647
return $this->attributes;
4748
}
4849

49-
public static function createFromAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, self $previousStamp = null, string $retryRoutingKey = null): self
50+
public static function createFromAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, self $previousStamp = null, string $retryRoutingKey = null, bool $forceDefaultRoutingKey = false): self
5051
{
5152
$attr = $previousStamp->attributes ?? [];
5253

@@ -71,6 +72,8 @@ public static function createFromAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, self
7172
$stamp->isRetryAttempt = true;
7273
}
7374

75+
$stamp->forceDefaultRoutingKey = $forceDefaultRoutingKey;
76+
7477
return $stamp;
7578
}
7679

@@ -87,4 +90,9 @@ public static function createWithAttributes(array $attributes, self $previousSta
8790
array_merge($previousStamp->attributes ?? [], $attributes)
8891
);
8992
}
93+
94+
public function isForceDefaultRoutingKey(): bool
95+
{
96+
return $this->forceDefaultRoutingKey;
97+
}
9098
}

‎src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php
+4Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -563,6 +563,10 @@ public function purgeQueues(): void
563563

564564
private function getRoutingKeyForMessage(?AmqpStamp $amqpStamp): ?string
565565
{
566+
if ($amqpStamp?->isForceDefaultRoutingKey()) {
567+
return $this->getDefaultPublishRoutingKey();
568+
}
569+
566570
return $amqpStamp?->getRoutingKey() ?? $this->getDefaultPublishRoutingKey();
567571
}
568572
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.