Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit df31f55

Browse filesBrowse files
committed
Uses an AmqpStamp to provide flags and attributes
1 parent 09dee17 commit df31f55
Copy full SHA for df31f55

File tree

8 files changed

+110
-50
lines changed
Filter options

8 files changed

+110
-50
lines changed

‎src/Symfony/Component/Messenger/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/CHANGELOG.md
+2-2Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ CHANGELOG
1818
changed: a required 3rd `SerializerInterface` argument was added.
1919
* Added a new `SyncTransport` along with `ForceCallHandlersStamp` to
2020
explicitly handle messages synchronously.
21-
* Added `AmqpRoutingKeyStamp` allowing to provide a routing key on message publishing.
21+
* Added `AmqpStamp` allowing to provide a routing key, flags and attributes on message publishing.
2222
* [BC BREAK] Removed publishing with a `routing_key` option from queue configuration, for
23-
AMQP. Use exchange `default_publish_routing_key` or `AmqpRoutingKeyStamp` instead.
23+
AMQP. Use exchange `default_publish_routing_key` or `AmqpStamp` instead.
2424
* [BC BREAK] Changed the `queue` option in the AMQP transport DSN to be `queues[name]`. You can
2525
therefore name the queue but also configure `binding_keys`, `flags` and `arguments`.
2626
* [BC BREAK] The methods `get`, `ack`, `nack` and `queue` of the AMQP `Connection`

‎src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpRoutingKeyStampTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpRoutingKeyStampTest.php
-24Lines changed: 0 additions & 24 deletions
This file was deleted.

‎src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php
+3-3Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
use PHPUnit\Framework\TestCase;
1515
use Symfony\Component\Messenger\Envelope;
1616
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
17-
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpRoutingKeyStamp;
1817
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpSender;
18+
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpStamp;
1919
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
2020
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
2121

@@ -41,14 +41,14 @@ public function testItSendsTheEncodedMessage()
4141

4242
public function testItSendsTheEncodedMessageUsingARoutingKey()
4343
{
44-
$envelope = (new Envelope(new DummyMessage('Oy')))->with(new AmqpRoutingKeyStamp('rk'));
44+
$envelope = (new Envelope(new DummyMessage('Oy')))->with($stamp = new AmqpStamp('rk'));
4545
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
4646

4747
$serializer = $this->createMock(SerializerInterface::class);
4848
$serializer->method('encode')->with($envelope)->willReturn($encoded);
4949

5050
$connection = $this->createMock(Connection::class);
51-
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 0, 'rk');
51+
$connection->expects($this->once())->method('publish')->with($encoded['body'], $encoded['headers'], 0, $stamp);
5252

5353
$sender = new AmqpSender($connection, $serializer);
5454
$sender->send($envelope);
+34Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Tests\Transport\AmqpExt;
13+
14+
use PHPUnit\Framework\TestCase;
15+
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpStamp;
16+
17+
class AmqpStampTest extends TestCase
18+
{
19+
public function testRoutingKeyOnly()
20+
{
21+
$stamp = new AmqpStamp('routing_key');
22+
$this->assertSame('routing_key', $stamp->getRoutingKey());
23+
$this->assertSame(AMQP_NOPARAM, $stamp->getFlags());
24+
$this->assertSame([], $stamp->getAttributes());
25+
}
26+
27+
public function testFlagsAndAttributes()
28+
{
29+
$stamp = new AmqpStamp(null, AMQP_DURABLE, ['delivery_mode' => 'unknown']);
30+
$this->assertNull($stamp->getRoutingKey());
31+
$this->assertSame(AMQP_DURABLE, $stamp->getFlags());
32+
$this->assertSame(['delivery_mode' => 'unknown'], $stamp->getAttributes());
33+
}
34+
}

‎src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php
+24-2Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313

1414
use PHPUnit\Framework\TestCase;
1515
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
16+
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
1617
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpFactory;
18+
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpStamp;
1719
use Symfony\Component\Messenger\Transport\AmqpExt\Connection;
1820

1921
/**
@@ -430,7 +432,7 @@ public function testItCanPublishWithASuppliedRoutingKey()
430432
$amqpExchange->expects($this->once())->method('publish')->with('body', 'routing_key');
431433

432434
$connection = Connection::fromDsn('amqp://localhost/%2f/messages?exchange[default_publish_routing_key]=default_routing_key', [], $factory);
433-
$connection->publish('body', [], 0, 'routing_key');
435+
$connection->publish('body', [], 0, new AmqpStamp('routing_key'));
434436
}
435437

436438
public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument()
@@ -477,7 +479,27 @@ public function testItDelaysTheMessageWithTheInitialSuppliedRoutingKeyAsArgument
477479
$delayQueue->expects($this->once())->method('bind')->with('delay', 'delay_120000');
478480

479481
$delayExchange->expects($this->once())->method('publish')->with('{}', 'delay_120000', AMQP_NOPARAM, ['headers' => []]);
480-
$connection->publish('{}', [], 120000, 'routing_key');
482+
$connection->publish('{}', [], 120000, new AmqpStamp('routing_key'));
483+
}
484+
485+
public function testItCanPublishWithCustomFlagsAndAttributes()
486+
{
487+
$factory = new TestAmqpFactory(
488+
$amqpConnection = $this->createMock(\AMQPConnection::class),
489+
$amqpChannel = $this->createMock(\AMQPChannel::class),
490+
$amqpQueue = $this->createMock(\AMQPQueue::class),
491+
$amqpExchange = $this->createMock(\AMQPExchange::class)
492+
);
493+
494+
$amqpExchange->expects($this->once())->method('publish')->with(
495+
'body',
496+
'routing_key',
497+
AMQP_IMMEDIATE,
498+
['delivery_mode' => 2, 'headers' => ['type' => DummyMessage::class]]
499+
);
500+
501+
$connection = Connection::fromDsn('amqp://localhost/%2f/messages', [], $factory);
502+
$connection->publish('body', ['type' => DummyMessage::class], 0, new AmqpStamp('routing_key', AMQP_IMMEDIATE, ['delivery_mode' => 2]));
481503
}
482504
}
483505

‎src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php
+6-5Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,12 @@ public function send(Envelope $envelope): Envelope
5151
}
5252

5353
try {
54-
/** @var $routingKeyStamp AmqpRoutingKeyStamp */
55-
$routingKeyStamp = $envelope->last(AmqpRoutingKeyStamp::class);
56-
$routingKey = $routingKeyStamp ? $routingKeyStamp->getRoutingKey() : null;
57-
58-
$this->connection->publish($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delay, $routingKey);
54+
$this->connection->publish(
55+
$encodedMessage['body'],
56+
$encodedMessage['headers'] ?? [],
57+
$delay,
58+
$envelope->last(AmqpStamp::class)
59+
);
5960
} catch (\AMQPException $e) {
6061
throw new TransportException($e->getMessage(), 0, $e);
6162
}

‎src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpRoutingKeyStamp.php renamed to ‎src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpStamp.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpStamp.php
+18-3Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,35 @@
1515

1616
/**
1717
* @author Guillaume Gammelin <ggammelin@gmail.com>
18+
* @author Samuel Roze <samuel.roze@gmail.com>
1819
*
1920
* @experimental in 4.3
2021
*/
21-
final class AmqpRoutingKeyStamp implements StampInterface
22+
final class AmqpStamp implements StampInterface
2223
{
2324
private $routingKey;
25+
private $flags;
26+
private $attributes;
2427

25-
public function __construct(string $routingKey)
28+
public function __construct(string $routingKey = null, int $flags = AMQP_NOPARAM, array $attributes = [])
2629
{
2730
$this->routingKey = $routingKey;
31+
$this->flags = $flags;
32+
$this->attributes = $attributes;
2833
}
2934

30-
public function getRoutingKey(): string
35+
public function getRoutingKey(): ?string
3136
{
3237
return $this->routingKey;
3338
}
39+
40+
public function getFlags(): int
41+
{
42+
return $this->flags;
43+
}
44+
45+
public function getAttributes(): array
46+
{
47+
return $this->attributes;
48+
}
3449
}

‎src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php
+23-11Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -171,10 +171,10 @@ private static function normalizeQueueArguments(array $arguments): array
171171
*
172172
* @throws \AMQPException
173173
*/
174-
public function publish(string $body, array $headers = [], int $delay = 0, string $routingKey = null): void
174+
public function publish(string $body, array $headers = [], int $delay = 0, AmqpStamp $amqpStamp = null): void
175175
{
176176
if (0 !== $delay) {
177-
$this->publishWithDelay($body, $headers, $delay, $routingKey);
177+
$this->publishWithDelay($body, $headers, $delay, $amqpStamp);
178178

179179
return;
180180
}
@@ -183,13 +183,14 @@ public function publish(string $body, array $headers = [], int $delay = 0, strin
183183
$this->setup();
184184
}
185185

186-
$this->exchange()->publish(
186+
$this->publishOnExchange(
187+
$this->exchange(),
187188
$body,
188-
$routingKey ?? $this->getDefaultPublishRoutingKey(),
189-
AMQP_NOPARAM,
189+
(null !== $amqpStamp ? $amqpStamp->getRoutingKey() : null) ?? $this->getDefaultPublishRoutingKey(),
190190
[
191191
'headers' => $headers,
192-
]
192+
],
193+
$amqpStamp
193194
);
194195
}
195196

@@ -206,19 +207,30 @@ public function countMessagesInQueues(): int
206207
/**
207208
* @throws \AMQPException
208209
*/
209-
private function publishWithDelay(string $body, array $headers, int $delay, ?string $exchangeRoutingKey)
210+
private function publishWithDelay(string $body, array $headers, int $delay, AmqpStamp $amqpStamp = null)
210211
{
211212
if ($this->shouldSetup()) {
212-
$this->setupDelay($delay, $exchangeRoutingKey);
213+
$this->setupDelay($delay, $amqpStamp !== null ? $amqpStamp->getRoutingKey() : null);
213214
}
214215

215-
$this->getDelayExchange()->publish(
216+
$this->publishOnExchange(
217+
$this->getDelayExchange(),
216218
$body,
217219
$this->getRoutingKeyForDelay($delay),
218-
AMQP_NOPARAM,
219220
[
220221
'headers' => $headers,
221-
]
222+
],
223+
$amqpStamp
224+
);
225+
}
226+
227+
private function publishOnExchange(\AMQPExchange $exchange, string $body, string $routingKey = null, array $attributes = [], AmqpStamp $amqpStamp = null)
228+
{
229+
$exchange->publish(
230+
$body,
231+
$routingKey,
232+
$amqpStamp ? $amqpStamp->getFlags() : AMQP_NOPARAM,
233+
array_merge($amqpStamp ? $amqpStamp->getAttributes() : [], $attributes)
222234
);
223235
}
224236

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.