Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit a1cb2ab

Browse filesBrowse files
committed
[Messenger][AMQP] Use delivery_mode=2 by default
1 parent 5690b97 commit a1cb2ab
Copy full SHA for a1cb2ab

File tree

2 files changed

+23
-7
lines changed
Filter options

2 files changed

+23
-7
lines changed

‎src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php
+22-7Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ public function testItSetupsTheConnectionWithDefaults()
227227
);
228228

229229
$amqpExchange->expects($this->once())->method('declareExchange');
230-
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => []]);
230+
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
231231
$amqpQueue->expects($this->once())->method('declareQueue');
232232
$amqpQueue->expects($this->once())->method('bind')->with(self::DEFAULT_EXCHANGE_NAME, null);
233233

@@ -250,7 +250,7 @@ public function testItSetupsTheConnection()
250250
$factory->method('createQueue')->will($this->onConsecutiveCalls($amqpQueue0, $amqpQueue1));
251251

252252
$amqpExchange->expects($this->once())->method('declareExchange');
253-
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key', AMQP_NOPARAM, ['headers' => []]);
253+
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
254254
$amqpQueue0->expects($this->once())->method('declareQueue');
255255
$amqpQueue0->expects($this->exactly(2))->method('bind')->withConsecutive(
256256
[self::DEFAULT_EXCHANGE_NAME, 'binding_key0'],
@@ -287,7 +287,7 @@ public function testBindingArguments()
287287
$factory->method('createQueue')->willReturn($amqpQueue);
288288

289289
$amqpExchange->expects($this->once())->method('declareExchange');
290-
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => []]);
290+
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
291291
$amqpQueue->expects($this->once())->method('declareQueue');
292292
$amqpQueue->expects($this->exactly(1))->method('bind')->withConsecutive(
293293
[self::DEFAULT_EXCHANGE_NAME, null, ['x-match' => 'all']]
@@ -400,7 +400,7 @@ public function testItDelaysTheMessage()
400400
$delayQueue->expects($this->once())->method('declareQueue');
401401
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__5000');
402402

403-
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__5000', AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo']]);
403+
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__5000', AMQP_NOPARAM, ['headers' => ['x-some-headers' => 'foo'], 'delivery_mode' => 2]);
404404

405405
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
406406
$connection->publish('{}', ['x-some-headers' => 'foo'], 5000);
@@ -442,7 +442,7 @@ public function testItDelaysTheMessageWithADifferentRoutingKeyAndTTLs()
442442
$delayQueue->expects($this->once())->method('declareQueue');
443443
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages__120000');
444444

445-
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000', AMQP_NOPARAM, ['headers' => []]);
445+
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages__120000', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
446446
$connection->publish('{}', [], 120000);
447447
}
448448

@@ -474,12 +474,27 @@ public function testAmqpStampHeadersAreUsed()
474474
$amqpExchange = $this->createMock(\AMQPExchange::class)
475475
);
476476

477-
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => ['Foo' => 'X', 'Bar' => 'Y']]);
477+
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => ['Foo' => 'X', 'Bar' => 'Y'], 'delivery_mode' => 2]);
478478

479479
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
480480
$connection->publish('body', ['Foo' => 'X'], 0, new AmqpStamp(null, AMQP_NOPARAM, ['headers' => ['Bar' => 'Y']]));
481481
}
482482

483+
public function testAmqpStampDelireryModeIsUsed()
484+
{
485+
$factory = new TestAmqpFactory(
486+
$this->createMock(\AMQPConnection::class),
487+
$this->createMock(\AMQPChannel::class),
488+
$this->createMock(\AMQPQueue::class),
489+
$amqpExchange = $this->createMock(\AMQPExchange::class)
490+
);
491+
492+
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 1]);
493+
494+
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
495+
$connection->publish('body', [], 0, new AmqpStamp(null, AMQP_NOPARAM, ['delivery_mode' => 1]));
496+
}
497+
483498
public function testItCanPublishWithTheDefaultRoutingKey()
484499
{
485500
$factory = new TestAmqpFactory(
@@ -546,7 +561,7 @@ public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument
546561
$delayQueue->expects($this->once())->method('declareQueue');
547562
$delayQueue->expects($this->once())->method('bind')->with('delays', 'delay_messages_routing_key_120000');
548563

549-
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000', AMQP_NOPARAM, ['headers' => []]);
564+
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_messages_routing_key_120000', AMQP_NOPARAM, ['headers' => [], 'delivery_mode' => 2]);
550565
$connection->publish('{}', [], 120000, new AmqpStamp('routing_key'));
551566
}
552567

‎src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ private function publishOnExchange(\AMQPExchange $exchange, string $body, string
230230
{
231231
$attributes = $amqpStamp ? $amqpStamp->getAttributes() : [];
232232
$attributes['headers'] = array_merge($attributes['headers'] ?? [], $headers);
233+
$attributes['delivery_mode'] = $attributes['delivery_mode'] ?? 2;
233234

234235
$exchange->publish(
235236
$body,

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.