Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 102bf8e

Browse filesBrowse files
committed
Added ability to distinguish retry and delay actions so that different "x-dead-letter-exchange" exchange name will be used in different scenarios
1 parent 75e71e3 commit 102bf8e
Copy full SHA for 102bf8e

File tree

5 files changed

+149
-53
lines changed
Filter options

5 files changed

+149
-53
lines changed

‎src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpExtIntegrationTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/AmqpExtIntegrationTest.php
+55Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
use Symfony\Component\Messenger\Bridge\Amqp\Transport\Connection;
2121
use Symfony\Component\Messenger\Envelope;
2222
use Symfony\Component\Messenger\Stamp\DelayStamp;
23+
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
2324
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
2425
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
2526
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
@@ -136,6 +137,45 @@ public function testRetryAndDelay()
136137
$receiver->ack($envelope);
137138
}
138139

140+
public function testRetryAffectsOnlyOriginalQueue()
141+
{
142+
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'), [
143+
'exchange' => [
144+
'name' => 'messages_topic',
145+
'type' => 'topic',
146+
'default_publish_routing_key' => 'topic_routing_key',
147+
],
148+
'queues' => [
149+
'A' => ['binding_keys' => ['topic_routing_key']],
150+
'B' => ['binding_keys' => ['topic_routing_key']],
151+
],
152+
]);
153+
$connection->setup();
154+
$connection->purgeQueues();
155+
156+
$serializer = $this->createSerializer();
157+
$sender = new AmqpSender($connection, $serializer);
158+
$receiver = new AmqpReceiver($connection, $serializer);
159+
160+
// initial delivery: should receive in both queues
161+
$sender->send(new Envelope(new DummyMessage('Payload')));
162+
163+
$receivedEnvelopes = $this->receiveWithQueueName($receiver);
164+
$this->assertCount(2, $receivedEnvelopes);
165+
$this->assertArrayHasKey('A', $receivedEnvelopes);
166+
$this->assertArrayHasKey('B', $receivedEnvelopes);
167+
168+
// redelivery: should receive in only "A" queue
169+
$retryEnvelope = $receivedEnvelopes['A']
170+
->with(new DelayStamp(10))
171+
->with(new RedeliveryStamp(1));
172+
$sender->send($retryEnvelope);
173+
174+
$redelivedEnvelopes = $this->receiveWithQueueName($receiver);
175+
$this->assertCount(1, $redelivedEnvelopes);
176+
$this->assertArrayHasKey('A', $redelivedEnvelopes);
177+
}
178+
139179
public function testItReceivesSignals()
140180
{
141181
$serializer = $this->createSerializer();
@@ -248,4 +288,19 @@ private function receiveEnvelopes(ReceiverInterface $receiver, int $timeout): ar
248288

249289
return $envelopes;
250290
}
291+
292+
private function receiveWithQueueName(AmqpReceiver $receiver)
293+
{
294+
// let RabbitMQ receive messages
295+
usleep(100 * 1000); // 100ms
296+
297+
$receivedEnvelopes = [];
298+
foreach ($receiver->get() as $envelope) {
299+
$queueName = $envelope->last(AmqpReceivedStamp::class)->getQueueName();
300+
$receivedEnvelopes[$queueName] = $envelope;
301+
$receiver->ack($envelope);
302+
}
303+
304+
return $receivedEnvelopes;
305+
}
251306
}

‎src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Amqp/Tests/Transport/ConnectionTest.php
+54-33Lines changed: 54 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -413,36 +413,26 @@ public function testAutoSetupWithDelayDeclaresExchangeQueuesAndDelay()
413413

414414
public function testItDelaysTheMessage()
415415
{
416-
$amqpConnection = $this->createMock(\AMQPConnection::class);
417-
$amqpChannel = $this->createMock(\AMQPChannel::class);
418-
419-
$factory = $this->createMock(AmqpFactory::class);
420-
$factory->method('createConnection')->willReturn($amqpConnection);
421-
$factory->method('createChannel')->willReturn($amqpChannel);
422-
$factory->method('createQueue')->will($this->onConsecutiveCalls(
423-
$this->createMock(\AMQPQueue::class),
424-
$delayQueue = $this->createMock(\AMQPQueue::class)
425-
));
426-
$factory->method('createExchange')->will($this->onConsecutiveCalls(
427-
$this->createMock(\AMQPExchange::class),
428-
$delayExchange = $this->createMock(\AMQPExchange::class)
429-
));
416+
$delayExchange = $this->createMock(\AMQPExchange::class);
417+
$delayExchange->expects($this->once())
418+
->method('publish')
419+
->with('{}', 'delay_messages__delay_5000', AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo'], 'delivery_mode' => 2]);
420+
$connection = $this->createDelayOrRetryConnection($delayExchange, self::DEFAULT_EXCHANGE_NAME, 'delay_messages__delay_5000');
430421

431-
$delayQueue->expects($this->once())->method('setName')->with('delay_messages__5000');
432-
$delayQueue->expects($this->once())->method('setArguments')->with([
433-
'x-message-ttl' => 5000,
434-
'x-expires' => 5000 + 10000,
435-
'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME,
436-
'x-dead-letter-routing-key' => '',
437-
]);
438-
439-
$delayQueue->expects($this->once())->method('declareQueue');
440-
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__5000');
422+
$connection->publish('{}', ['x-some-headers' => 'foo'], 5000);
423+
}
441424

442-
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__5000', AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo'], 'delivery_mode' => 2]);
425+
public function testItRetriesTheMessage()
426+
{
427+
$delayExchange = $this->createMock(\AMQPExchange::class);
428+
$delayExchange->expects($this->once())
429+
->method('publish')
430+
->with('{}', 'delay_messages__retry_5000', AMQP_NOPARAM);
431+
$connection = $this->createDelayOrRetryConnection($delayExchange, '', 'delay_messages__retry_5000');
443432

444-
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
445-
$connection->publish('{}', ['x-some-headers' => 'foo'], 5000);
433+
$amqpEnvelope = $this->createMock(\AMQPEnvelope::class);
434+
$amqpStamp = AmqpStamp::createFromAmqpEnvelope($amqpEnvelope, null, '');
435+
$connection->publish('{}', [], 5000, $amqpStamp);
446436
}
447437

448438
public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs()
@@ -470,7 +460,7 @@ public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs()
470460

471461
$connection = Connection::fromDsn('amqp://localhost', $connectionOptions, $factory);
472462

473-
$delayQueue->expects($this->once())->method('setName')->with('delay_messages__120000');
463+
$delayQueue->expects($this->once())->method('setName')->with('delay_messages__delay_120000');
474464
$delayQueue->expects($this->once())->method('setArguments')->with([
475465
'x-message-ttl' => 120000,
476466
'x-expires' => 120000 + 10000,
@@ -479,9 +469,9 @@ public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs()
479469
]);
480470

481471
$delayQueue->expects($this->once())->method('declareQueue');
482-
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__120000');
472+
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__delay_120000');
483473

484-
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
474+
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__delay_120000', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
485475
$connection->publish('{}', [], 120000);
486476
}
487477

@@ -589,7 +579,7 @@ public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument
589579

590580
$connection = Connection::fromDsn('amqp://localhost', $connectionOptions, $factory);
591581

592-
$delayQueue->expects($this->once())->method('setName')->with('delay_messages_routing_key_120000');
582+
$delayQueue->expects($this->once())->method('setName')->with('delay_messages_routing_key_delay_120000');
593583
$delayQueue->expects($this->once())->method('setArguments')->with([
594584
'x-message-ttl' => 120000,
595585
'x-expires' => 120000 + 10000,
@@ -598,9 +588,9 @@ public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument
598588
]);
599589

600590
$delayQueue->expects($this->once())->method('declareQueue');
601-
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages_routing_key_120000');
591+
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages_routing_key_delay_120000');
602592

603-
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
593+
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_delay_120000', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
604594
$connection->publish('{}', [], 120000, new AmqpStamp('routing_key'));
605595
}
606596

@@ -623,6 +613,37 @@ public function testItCanPublishWithCustomFlagsAndAttributes()
623613
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
624614
$connection->publish('body', ['type' => DummyMessage::class], 0, new AmqpStamp('routing_key', AMQP_IMMEDIATE, ['delivery_mode' => 2]));
625615
}
616+
617+
private function createDelayOrRetryConnection(\AMQPExchange $delayExchange, string $deadLetterExchangeName, string $delayQueueName): Connection
618+
{
619+
$amqpConnection = $this->createMock(\AMQPConnection::class);
620+
$amqpChannel = $this->createMock(\AMQPChannel::class);
621+
622+
$factory = $this->createMock(AmqpFactory::class);
623+
$factory->method('createConnection')->willReturn($amqpConnection);
624+
$factory->method('createChannel')->willReturn($amqpChannel);
625+
$factory->method('createQueue')->will($this->onConsecutiveCalls(
626+
$this->createMock(\AMQPQueue::class),
627+
$delayQueue = $this->createMock(\AMQPQueue::class)
628+
));
629+
$factory->method('createExchange')->will($this->onConsecutiveCalls(
630+
$this->createMock(\AMQPExchange::class),
631+
$delayExchange
632+
));
633+
634+
$delayQueue->expects($this->once())->method('setName')->with($delayQueueName);
635+
$delayQueue->expects($this->once())->method('setArguments')->with([
636+
'x-message-ttl' => 5000,
637+
'x-expires' => 5000 + 10000,
638+
'x-dead-letter-exchange' => $deadLetterExchangeName,
639+
'x-dead-letter-routing-key' => '',
640+
]);
641+
642+
$delayQueue->expects($this->once())->method('declareQueue');
643+
$delayQueue->expects($this->once())->method('bind')->with('delays', $delayQueueName);
644+
645+
return Connection::fromDsn('amqp://localhost', [], $factory);
646+
}
626647
}
627648

628649
class TestAmqpFactory extends AmqpFactory

‎src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpSender.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpSender.php
+6-1Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use Symfony\Component\Messenger\Envelope;
1515
use Symfony\Component\Messenger\Exception\TransportException;
1616
use Symfony\Component\Messenger\Stamp\DelayStamp;
17+
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
1718
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
1819
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
1920
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -58,7 +59,11 @@ public function send(Envelope $envelope): Envelope
5859

5960
$amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);
6061
if ($amqpReceivedStamp instanceof AmqpReceivedStamp) {
61-
$amqpStamp = AmqpStamp::createFromAmqpEnvelope($amqpReceivedStamp->getAmqpEnvelope(), $amqpStamp);
62+
$amqpStamp = AmqpStamp::createFromAmqpEnvelope(
63+
$amqpReceivedStamp->getAmqpEnvelope(),
64+
$amqpStamp,
65+
$envelope->last(RedeliveryStamp::class) ? $amqpReceivedStamp->getQueueName() : null
66+
);
6267
}
6368

6469
try {

‎src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpStamp.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Amqp/Transport/AmqpStamp.php
+16-2Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespace Symfony\Component\Messenger\Bridge\Amqp\Transport;
1313

1414
use Symfony\Component\Messenger\Stamp\NonSendableStampInterface;
15+
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
1516

1617
/**
1718
* @author Guillaume Gammelin <ggammelin@gmail.com>
@@ -22,6 +23,7 @@ final class AmqpStamp implements NonSendableStampInterface
2223
private $routingKey;
2324
private $flags;
2425
private $attributes;
26+
private $isEnvelopeRedelivered = false;
2527

2628
public function __construct(string $routingKey = null, int $flags = AMQP_NOPARAM, array $attributes = [])
2729
{
@@ -45,7 +47,12 @@ public function getAttributes(): array
4547
return $this->attributes;
4648
}
4749

48-
public static function createFromAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, self $previousStamp = null): self
50+
public function isEnvelopeRedelivered(): bool
51+
{
52+
return $this->isEnvelopeRedelivered;
53+
}
54+
55+
public static function createFromAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, self $previousStamp = null, string $redeliveryRoutingKey = null): self
4956
{
5057
$attr = $previousStamp->attributes ?? [];
5158

@@ -62,7 +69,14 @@ public static function createFromAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, self
6269
$attr['type'] = $attr['type'] ?? $amqpEnvelope->getType();
6370
$attr['reply_to'] = $attr['reply_to'] ?? $amqpEnvelope->getReplyTo();
6471

65-
return new self($previousStamp->routingKey ?? $amqpEnvelope->getRoutingKey(), $previousStamp->flags ?? AMQP_NOPARAM, $attr);
72+
if (is_null($redeliveryRoutingKey)) {
73+
$stamp = new self($previousStamp->routingKey ?? $amqpEnvelope->getRoutingKey(), $previousStamp->flags ?? AMQP_NOPARAM, $attr);
74+
} else {
75+
$stamp = new self($redeliveryRoutingKey, $previousStamp->flags ?? AMQP_NOPARAM, $attr);
76+
$stamp->isEnvelopeRedelivered = true;
77+
}
78+
79+
return $stamp;
6680
}
6781

6882
public static function createWithAttributes(array $attributes, self $previousStamp = null): self

‎src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Amqp/Transport/Connection.php
+18-17Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar
107107
$this->connectionOptions = array_replace_recursive([
108108
'delay' => [
109109
'exchange_name' => 'delays',
110-
'queue_name_pattern' => 'delay_%exchange_name%_%routing_key%_%delay%',
110+
'queue_name_pattern' => 'delay_%exchange_name%_%routing_key%_%action%_%delay%',
111111
],
112112
], $connectionOptions);
113113
$this->exchangeOptions = $exchangeOptions;
@@ -140,7 +140,7 @@ public function __construct(array $connectionOptions, array $exchangeOptions, ar
140140
* * flags: Exchange flags (Default: AMQP_DURABLE)
141141
* * arguments: Extra arguments
142142
* * delay:
143-
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_%exchange_name%_%routing_key%_%delay%")
143+
* * queue_name_pattern: Pattern to use to create the queues (Default: "delay_%exchange_name%_%routing_key%_%action%_%delay%")
144144
* * exchange_name: Name of the exchange to be used for the delayed/retried messages (Default: "delays")
145145
* * auto_setup: Enable or not the auto-setup of queues and exchanges (Default: true)
146146
* * prefetch_count: set channel prefetch count
@@ -300,13 +300,14 @@ public function countMessagesInQueues(): int
300300
private function publishWithDelay(string $body, array $headers, int $delay, AmqpStamp $amqpStamp = null)
301301
{
302302
$routingKey = $this->getRoutingKeyForMessage($amqpStamp);
303+
$isRetryAttempt = $amqpStamp ? $amqpStamp->isEnvelopeRedelivered() : false;
303304

304-
$this->setupDelay($delay, $routingKey);
305+
$this->setupDelay($delay, $routingKey, $isRetryAttempt);
305306

306307
$this->publishOnExchange(
307308
$this->getDelayExchange(),
308309
$body,
309-
$this->getRoutingKeyForDelay($delay, $routingKey),
310+
$this->getRoutingKeyForDelay($delay, $routingKey, $isRetryAttempt),
310311
$headers,
311312
$amqpStamp
312313
);
@@ -326,15 +327,15 @@ private function publishOnExchange(\AMQPExchange $exchange, string $body, string
326327
);
327328
}
328329

329-
private function setupDelay(int $delay, ?string $routingKey)
330+
private function setupDelay(int $delay, ?string $routingKey, bool $isRetryAttempt)
330331
{
331332
if ($this->shouldSetup()) {
332333
$this->setup(); // setup delay exchange and normal exchange for delay queue to DLX messages to
333334
}
334335

335-
$queue = $this->createDelayQueue($delay, $routingKey);
336+
$queue = $this->createDelayQueue($delay, $routingKey, $isRetryAttempt);
336337
$queue->declareQueue(); // the delay queue always need to be declared because the name is dynamic and cannot be declared in advance
337-
$queue->bind($this->connectionOptions['delay']['exchange_name'], $this->getRoutingKeyForDelay($delay, $routingKey));
338+
$queue->bind($this->connectionOptions['delay']['exchange_name'], $this->getRoutingKeyForDelay($delay, $routingKey, $isRetryAttempt));
338339
}
339340

340341
private function getDelayExchange(): \AMQPExchange
@@ -358,21 +359,19 @@ private function getDelayExchange(): \AMQPExchange
358359
* which is the original exchange, resulting on it being put back into
359360
* the original queue.
360361
*/
361-
private function createDelayQueue(int $delay, ?string $routingKey): \AMQPQueue
362+
private function createDelayQueue(int $delay, ?string $routingKey, bool $isRetryAttempt): \AMQPQueue
362363
{
363364
$queue = $this->amqpFactory->createQueue($this->channel());
364-
$queue->setName(str_replace(
365-
['%delay%', '%exchange_name%', '%routing_key%'],
366-
[$delay, $this->exchangeOptions['name'], $routingKey ?? ''],
367-
$this->connectionOptions['delay']['queue_name_pattern']
368-
));
365+
$queue->setName($this->getRoutingKeyForDelay($delay, $routingKey, $isRetryAttempt));
369366
$queue->setFlags(AMQP_DURABLE);
370367
$queue->setArguments([
371368
'x-message-ttl' => $delay,
372369
// delete the delay queue 10 seconds after the message expires
373370
// publishing another message redeclares the queue which renews the lease
374371
'x-expires' => $delay + 10000,
375-
'x-dead-letter-exchange' => $this->exchangeOptions['name'],
372+
// message should be broadcasted to all consumers during delay, but to only one queue during retry
373+
// empty name is default direct exchange
374+
'x-dead-letter-exchange' => $isRetryAttempt ? '' : $this->exchangeOptions['name'],
376375
// after being released from to DLX, make sure the original routing key will be used
377376
// we must use an empty string instead of null for the argument to be picked up
378377
'x-dead-letter-routing-key' => $routingKey ?? '',
@@ -381,11 +380,13 @@ private function createDelayQueue(int $delay, ?string $routingKey): \AMQPQueue
381380
return $queue;
382381
}
383382

384-
private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey): string
383+
private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey, bool $isRetryAttempt): string
385384
{
385+
$action = $isRetryAttempt ? 'retry' : 'delay';
386+
386387
return str_replace(
387-
['%delay%', '%exchange_name%', '%routing_key%'],
388-
[$delay, $this->exchangeOptions['name'], $finalRoutingKey ?? ''],
388+
['%delay%', '%exchange_name%', '%routing_key%', '%action%'],
389+
[$delay, $this->exchangeOptions['name'], $finalRoutingKey ?? '', $action],
389390
$this->connectionOptions['delay']['queue_name_pattern']
390391
);
391392
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.