Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

[Messenger] Ability to distinguish retry and delay actions #36864

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions 1 src/Symfony/Component/Messenger/Bridge/Amqp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ CHANGELOG

* Deprecated the `prefetch_count` parameter, it has no effect and will be removed in Symfony 6.0.
* `AmqpReceiver` implements `QueueReceiverInterface` to fetch messages from a specific set of queues.
* Add ability to distinguish retry and delay actions

5.2.0
-----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,45 @@ public function testRetryAndDelay()
$receiver->ack($envelope);
}

public function testRetryAffectsOnlyOriginalQueue()
{
$connection = Connection::fromDsn(getenv('MESSENGER_AMQP_DSN'), [
'exchange' => [
'name' => 'messages_topic',
'type' => 'topic',
'default_publish_routing_key' => 'topic_routing_key',
],
'queues' => [
'A' => ['binding_keys' => ['topic_routing_key']],
'B' => ['binding_keys' => ['topic_routing_key']],
],
]);
$connection->setup();
$connection->purgeQueues();

$serializer = $this->createSerializer();
$sender = new AmqpSender($connection, $serializer);
$receiver = new AmqpReceiver($connection, $serializer);

// initial delivery: should receive in both queues
$sender->send(new Envelope(new DummyMessage('Payload')));

$receivedEnvelopes = $this->receiveWithQueueName($receiver);
$this->assertCount(2, $receivedEnvelopes);
$this->assertArrayHasKey('A', $receivedEnvelopes);
$this->assertArrayHasKey('B', $receivedEnvelopes);

// retry: should receive in only "A" queue
$retryEnvelope = $receivedEnvelopes['A']
->with(new DelayStamp(10))
->with(new RedeliveryStamp(1));
$sender->send($retryEnvelope);

$retriedEnvelopes = $this->receiveWithQueueName($receiver);
$this->assertCount(1, $retriedEnvelopes);
$this->assertArrayHasKey('A', $retriedEnvelopes);
}

public function testItReceivesSignals()
{
$serializer = $this->createSerializer();
Expand Down Expand Up @@ -255,4 +294,19 @@ private function receiveEnvelopes(ReceiverInterface $receiver, int $timeout): ar

return $envelopes;
}

private function receiveWithQueueName(AmqpReceiver $receiver)
{
// let RabbitMQ receive messages
usleep(100 * 1000); // 100ms

$receivedEnvelopes = [];
foreach ($receiver->get() as $envelope) {
$queueName = $envelope->last(AmqpReceivedStamp::class)->getQueueName();
$receivedEnvelopes[$queueName] = $envelope;
$receiver->ack($envelope);
}

return $receivedEnvelopes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -493,36 +493,30 @@ public function testAutoSetupWithDelayDeclaresExchangeQueuesAndDelay()

public function testItDelaysTheMessage()
{
$amqpConnection = $this->createMock(\AMQPConnection::class);
$amqpChannel = $this->createMock(\AMQPChannel::class);

$factory = $this->createMock(AmqpFactory::class);
$factory->method('createConnection')->willReturn($amqpConnection);
$factory->method('createChannel')->willReturn($amqpChannel);
$factory->method('createQueue')->will($this->onConsecutiveCalls(
$this->createMock(\AMQPQueue::class),
$delayQueue = $this->createMock(\AMQPQueue::class)
));
$factory->method('createExchange')->will($this->onConsecutiveCalls(
$this->createMock(\AMQPExchange::class),
$delayExchange = $this->createMock(\AMQPExchange::class)
));
$delayExchange = $this->createMock(\AMQPExchange::class);
$delayExchange->expects($this->once())
->method('publish')
->with('{}', 'delay_messages__5000_delay', AMQP_NOPARAM, [
'headers' => ['x-some-headers' => 'foo'],
'delivery_mode' => 2,
'timestamp' => time(),
]);
$connection = $this->createDelayOrRetryConnection($delayExchange, self::DEFAULT_EXCHANGE_NAME, 'delay_messages__5000_delay');

$delayQueue->expects($this->once())->method('setName')->with('delay_messages__5000');
$delayQueue->expects($this->once())->method('setArguments')->with([
'x-message-ttl' => 5000,
'x-expires' => 5000 + 10000,
'x-dead-letter-exchange' => self::DEFAULT_EXCHANGE_NAME,
'x-dead-letter-routing-key' => '',
]);

$delayQueue->expects($this->once())->method('declareQueue');
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__5000');
$connection->publish('{}', ['x-some-headers' => 'foo'], 5000);
}

$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__5000', \AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo'], 'delivery_mode' => 2, 'timestamp' => time()]);
public function testItRetriesTheMessage()
{
$delayExchange = $this->createMock(\AMQPExchange::class);
$delayExchange->expects($this->once())
->method('publish')
->with('{}', 'delay_messages__5000_retry', AMQP_NOPARAM);
$connection = $this->createDelayOrRetryConnection($delayExchange, '', 'delay_messages__5000_retry');

$connection = Connection::fromDsn('amqp://localhost', [], $factory);
$connection->publish('{}', ['x-some-headers' => 'foo'], 5000);
$amqpEnvelope = $this->createMock(\AMQPEnvelope::class);
$amqpStamp = AmqpStamp::createFromAmqpEnvelope($amqpEnvelope, null, '');
$connection->publish('{}', [], 5000, $amqpStamp);
}

public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs()
Expand Down Expand Up @@ -550,7 +544,7 @@ public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs()

$connection = Connection::fromDsn('amqp://localhost', $connectionOptions, $factory);

$delayQueue->expects($this->once())->method('setName')->with('delay_messages__120000');
$delayQueue->expects($this->once())->method('setName')->with('delay_messages__120000_delay');
$delayQueue->expects($this->once())->method('setArguments')->with([
'x-message-ttl' => 120000,
'x-expires' => 120000 + 10000,
Expand All @@ -559,9 +553,9 @@ public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs()
]);

$delayQueue->expects($this->once())->method('declareQueue');
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__120000');
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__120000_delay');

$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000', \AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]);
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000_delay', \AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]);
$connection->publish('{}', [], 120000);
}

Expand Down Expand Up @@ -690,7 +684,7 @@ public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument

$connection = Connection::fromDsn('amqp://localhost', $connectionOptions, $factory);

$delayQueue->expects($this->once())->method('setName')->with('delay_messages_routing_key_120000');
$delayQueue->expects($this->once())->method('setName')->with('delay_messages_routing_key_120000_delay');
$delayQueue->expects($this->once())->method('setArguments')->with([
'x-message-ttl' => 120000,
'x-expires' => 120000 + 10000,
Expand All @@ -699,9 +693,9 @@ public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument
]);

$delayQueue->expects($this->once())->method('declareQueue');
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages_routing_key_120000');
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages_routing_key_120000_delay');

$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000', \AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]);
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000_delay', \AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2, 'timestamp' => time()]);
$connection->publish('{}', [], 120000, new AmqpStamp('routing_key'));
}

Expand Down Expand Up @@ -769,6 +763,37 @@ public function testItCanPublishAndWaitForConfirmation()
$connection = Connection::fromDsn('amqp://localhost?confirm_timeout=0.5', [], $factory);
$connection->publish('body');
}

private function createDelayOrRetryConnection(\AMQPExchange $delayExchange, string $deadLetterExchangeName, string $delayQueueName): Connection
{
$amqpConnection = $this->createMock(\AMQPConnection::class);
$amqpChannel = $this->createMock(\AMQPChannel::class);

$factory = $this->createMock(AmqpFactory::class);
$factory->method('createConnection')->willReturn($amqpConnection);
$factory->method('createChannel')->willReturn($amqpChannel);
$factory->method('createQueue')->will($this->onConsecutiveCalls(
$this->createMock(\AMQPQueue::class),
$delayQueue = $this->createMock(\AMQPQueue::class)
));
$factory->method('createExchange')->will($this->onConsecutiveCalls(
$this->createMock(\AMQPExchange::class),
$delayExchange
));

$delayQueue->expects($this->once())->method('setName')->with($delayQueueName);
$delayQueue->expects($this->once())->method('setArguments')->with([
'x-message-ttl' => 5000,
'x-expires' => 5000 + 10000,
'x-dead-letter-exchange' => $deadLetterExchangeName,
'x-dead-letter-routing-key' => '',
]);

$delayQueue->expects($this->once())->method('declareQueue');
$delayQueue->expects($this->once())->method('bind')->with('delays', $delayQueueName);

return Connection::fromDsn('amqp://localhost', [], $factory);
}
}

class TestAmqpFactory extends AmqpFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
Expand Down Expand Up @@ -58,7 +59,11 @@ public function send(Envelope $envelope): Envelope

$amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class);
if ($amqpReceivedStamp instanceof AmqpReceivedStamp) {
$amqpStamp = AmqpStamp::createFromAmqpEnvelope($amqpReceivedStamp->getAmqpEnvelope(), $amqpStamp);
$amqpStamp = AmqpStamp::createFromAmqpEnvelope(
$amqpReceivedStamp->getAmqpEnvelope(),
$amqpStamp,
$envelope->last(RedeliveryStamp::class) ? $amqpReceivedStamp->getQueueName() : null
);
}

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ final class AmqpStamp implements NonSendableStampInterface
private $routingKey;
private $flags;
private $attributes;
private $isRetryAttempt = false;

public function __construct(string $routingKey = null, int $flags = \AMQP_NOPARAM, array $attributes = [])
{
Expand All @@ -45,7 +46,7 @@ public function getAttributes(): array
return $this->attributes;
}

public static function createFromAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, self $previousStamp = null): self
public static function createFromAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, self $previousStamp = null, string $retryRoutingKey = null): self
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are using $amqpReceivedStamp->getQueueName() to populate the $retryRoutingKey, is that really correct?

A routing key is not the same as the queue name.

(It is probably correct, but I just need you to clarify.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is correct. We want to distinguish:

  • delay scenario, where message should be sent to original exchange to original routing key (so that all consumers can get it)
  • retry scenario, where message should be sent to default (direct) exchange to routing key which matches queue name (in this case it will be received in one queue only)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure the routing key always matches the queue name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For direct exchange in AMQP 0-9-1 yes:

The default exchange is a direct exchange with no name (empty string) pre-declared by the broker. It has one special property that makes it very useful for simple applications: every queue that is created is automatically bound to it with a routing key which is the same as the queue name.

See https://www.rabbitmq.com/tutorials/amqp-concepts.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Nyholm can you review please?

{
$attr = $previousStamp->attributes ?? [];

Expand All @@ -62,7 +63,19 @@ public static function createFromAmqpEnvelope(\AMQPEnvelope $amqpEnvelope, self
$attr['type'] = $attr['type'] ?? $amqpEnvelope->getType();
$attr['reply_to'] = $attr['reply_to'] ?? $amqpEnvelope->getReplyTo();

return new self($previousStamp->routingKey ?? $amqpEnvelope->getRoutingKey(), $previousStamp->flags ?? \AMQP_NOPARAM, $attr);
if (null === $retryRoutingKey) {
$stamp = new self($previousStamp->routingKey ?? $amqpEnvelope->getRoutingKey(), $previousStamp->flags ?? AMQP_NOPARAM, $attr);
} else {
$stamp = new self($retryRoutingKey, $previousStamp->flags ?? AMQP_NOPARAM, $attr);
$stamp->isRetryAttempt = true;
}

return $stamp;
}

public function isRetryAttempt(): bool
{
return $this->isRetryAttempt;
}

public static function createWithAttributes(array $attributes, self $previousStamp = null): self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,13 +323,14 @@ public function countMessagesInQueues(): int
private function publishWithDelay(string $body, array $headers, int $delay, AmqpStamp $amqpStamp = null)
{
$routingKey = $this->getRoutingKeyForMessage($amqpStamp);
$isRetryAttempt = $amqpStamp ? $amqpStamp->isRetryAttempt() : false;

$this->setupDelay($delay, $routingKey);
$this->setupDelay($delay, $routingKey, $isRetryAttempt);

$this->publishOnExchange(
$this->getDelayExchange(),
$body,
$this->getRoutingKeyForDelay($delay, $routingKey),
$this->getRoutingKeyForDelay($delay, $routingKey, $isRetryAttempt),
$headers,
$amqpStamp
);
Expand All @@ -354,15 +355,15 @@ private function publishOnExchange(\AMQPExchange $exchange, string $body, string
}
}

private function setupDelay(int $delay, ?string $routingKey)
private function setupDelay(int $delay, ?string $routingKey, bool $isRetryAttempt)
{
if ($this->autoSetup) {
$this->setup(); // setup delay exchange and normal exchange for delay queue to DLX messages to
}

$queue = $this->createDelayQueue($delay, $routingKey);
$queue = $this->createDelayQueue($delay, $routingKey, $isRetryAttempt);
$queue->declareQueue(); // the delay queue always need to be declared because the name is dynamic and cannot be declared in advance
$queue->bind($this->connectionOptions['delay']['exchange_name'], $this->getRoutingKeyForDelay($delay, $routingKey));
$queue->bind($this->connectionOptions['delay']['exchange_name'], $this->getRoutingKeyForDelay($delay, $routingKey, $isRetryAttempt));
}

private function getDelayExchange(): \AMQPExchange
Expand All @@ -386,21 +387,19 @@ private function getDelayExchange(): \AMQPExchange
* which is the original exchange, resulting on it being put back into
* the original queue.
*/
private function createDelayQueue(int $delay, ?string $routingKey): \AMQPQueue
private function createDelayQueue(int $delay, ?string $routingKey, bool $isRetryAttempt): \AMQPQueue
{
$queue = $this->amqpFactory->createQueue($this->channel());
$queue->setName(str_replace(
['%delay%', '%exchange_name%', '%routing_key%'],
[$delay, $this->exchangeOptions['name'], $routingKey ?? ''],
$this->connectionOptions['delay']['queue_name_pattern']
));
$queue->setName($this->getRoutingKeyForDelay($delay, $routingKey, $isRetryAttempt));
$queue->setFlags(\AMQP_DURABLE);
$queue->setArguments([
'x-message-ttl' => $delay,
// delete the delay queue 10 seconds after the message expires
// publishing another message redeclares the queue which renews the lease
'x-expires' => $delay + 10000,
'x-dead-letter-exchange' => $this->exchangeOptions['name'],
// message should be broadcasted to all consumers during delay, but to only one queue during retry
// empty name is default direct exchange
'x-dead-letter-exchange' => $isRetryAttempt ? '' : $this->exchangeOptions['name'],
// after being released from to DLX, make sure the original routing key will be used
// we must use an empty string instead of null for the argument to be picked up
'x-dead-letter-routing-key' => $routingKey ?? '',
Expand All @@ -409,13 +408,15 @@ private function createDelayQueue(int $delay, ?string $routingKey): \AMQPQueue
return $queue;
}

private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey): string
private function getRoutingKeyForDelay(int $delay, ?string $finalRoutingKey, bool $isRetryAttempt): string
{
$action = $isRetryAttempt ? '_retry' : '_delay';

return str_replace(
['%delay%', '%exchange_name%', '%routing_key%'],
[$delay, $this->exchangeOptions['name'], $finalRoutingKey ?? ''],
$this->connectionOptions['delay']['queue_name_pattern']
);
).$action;
}

/**
Expand Down
Morty Proxy This is a proxified and sanitized view of the page, visit original site.