Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit b727f59

Browse filesBrowse files
committed
feature #30375 [Messenger] Added transport agnostic exception (nikossvnk, lolmx)
This PR was merged into the 4.3-dev branch. Discussion ---------- [Messenger] Added transport agnostic exception | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | BC breaks? | yes | Deprecations? | no | Tests pass? | yes | Fixed tickets | #30346 | License | MIT | Doc PR | TODO As described in #30346, client code shouldn't care about which transport is currently used by the message bus. This pr adds a new generic exception that is thrown by the `AmqpSender` if the message couldn't be delivered. Commits ------- 7d6a3fa Updated changelog to document changes in AmqpReceiver 62a08ee Updated exception message in AmqpSender, updated AmqpReceiver to throw new TransportException b2b0640 Chain new exception with previous one 06c8404 forgot one backslash, my bad 93c1001 [Messenger] Added new TransportException which is thrown if transport could not send a message
2 parents a2aee03 + 7d6a3fa commit b727f59
Copy full SHA for b727f59

File tree

7 files changed

+145
-3
lines changed
Filter options

7 files changed

+145
-3
lines changed

‎UPGRADE-4.3.md

Copy file name to clipboardExpand all lines: UPGRADE-4.3.md
+5Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,11 @@ HttpFoundation
5050
* The `FileinfoMimeTypeGuesser` class has been deprecated,
5151
use `Symfony\Component\Mime\FileinfoMimeTypeGuesser` instead.
5252

53+
Messenger
54+
---------
55+
56+
* `Amqp` transport does not throw `\AMQPException` anymore, catch `TransportException` instead.
57+
5358
Routing
5459
-------
5560

‎src/Symfony/Component/Messenger/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/CHANGELOG.md
+5Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@ CHANGELOG
1111
changed from `Serializer` to `PhpSerializer` inside `AmqpReceiver`,
1212
`AmqpSender`, `AmqpTransport` and `AmqpTransportFactory`.
1313

14+
* Added `TransportException` to mark an exception transport-related
15+
16+
* [BC BREAK] If listening to exceptions while using `AmqpSender` or `AmqpReceiver`, `\AMQPException` is
17+
no longer thrown in favor of `TransportException`.
18+
1419
4.2.0
1520
-----
1621

+21Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Exception;
13+
14+
/**
15+
* @author Eric Masoero <em@studeal.fr>
16+
*
17+
* @experimental in 4.2
18+
*/
19+
class TransportException extends RuntimeException
20+
{
21+
}

‎src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php
+77Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,83 @@ public function testItRejectsTheMessageIfTheExceptionIsARejectMessageExceptionIn
101101
throw new WillNeverWorkException('Well...');
102102
});
103103
}
104+
105+
/**
106+
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
107+
*/
108+
public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage()
109+
{
110+
$serializer = new Serializer(
111+
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
112+
);
113+
114+
$envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
115+
$envelope->method('getBody')->willReturn('{"message": "Hi"}');
116+
$envelope->method('getHeaders')->willReturn([
117+
'type' => DummyMessage::class,
118+
]);
119+
120+
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
121+
$connection->method('get')->willReturn($envelope);
122+
123+
$connection->method('ack')->with($envelope)->willThrowException(new \AMQPException());
124+
125+
$receiver = new AmqpReceiver($connection, $serializer);
126+
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
127+
$receiver->stop();
128+
});
129+
}
130+
131+
/**
132+
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
133+
*/
134+
public function testItThrowsATransportExceptionIfItCannotRejectMessage()
135+
{
136+
$serializer = new Serializer(
137+
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
138+
);
139+
140+
$envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
141+
$envelope->method('getBody')->willReturn('{"message": "Hi"}');
142+
$envelope->method('getHeaders')->willReturn([
143+
'type' => DummyMessage::class,
144+
]);
145+
146+
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
147+
$connection->method('get')->willReturn($envelope);
148+
$connection->method('reject')->with($envelope)->willThrowException(new \AMQPException());
149+
150+
$receiver = new AmqpReceiver($connection, $serializer);
151+
$receiver->receive(function () {
152+
throw new WillNeverWorkException('Well...');
153+
});
154+
}
155+
156+
/**
157+
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
158+
*/
159+
public function testItThrowsATransportExceptionIfItCannotNonAcknowledgeMessage()
160+
{
161+
$serializer = new Serializer(
162+
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
163+
);
164+
165+
$envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
166+
$envelope->method('getBody')->willReturn('{"message": "Hi"}');
167+
$envelope->method('getHeaders')->willReturn([
168+
'type' => DummyMessage::class,
169+
]);
170+
171+
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
172+
$connection->method('get')->willReturn($envelope);
173+
174+
$connection->method('nack')->with($envelope)->willThrowException(new \AMQPException());
175+
176+
$receiver = new AmqpReceiver($connection, $serializer);
177+
$receiver->receive(function () {
178+
throw new InterruptException('Well...');
179+
});
180+
}
104181
}
105182

106183
class InterruptException extends \Exception

‎src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpSenderTest.php
+18Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,22 @@ public function testItSendsTheEncodedMessage()
3737
$sender = new AmqpSender($connection, $serializer);
3838
$sender->send($envelope);
3939
}
40+
41+
/**
42+
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
43+
*/
44+
public function testItThrowsATransportExceptionIfItCannotSendTheMessage()
45+
{
46+
$envelope = new Envelope(new DummyMessage('Oy'));
47+
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
48+
49+
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock();
50+
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
51+
52+
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
53+
$connection->method('publish')->with($encoded['body'], $encoded['headers'])->willThrowException(new \AMQPException());
54+
55+
$sender = new AmqpSender($connection, $serializer);
56+
$sender->send($envelope);
57+
}
4058
}

‎src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php
+13-2Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
namespace Symfony\Component\Messenger\Transport\AmqpExt;
1313

14+
use Symfony\Component\Messenger\Exception\TransportException;
1415
use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RejectMessageExceptionInterface;
1516
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
1617
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
@@ -61,11 +62,21 @@ public function receive(callable $handler): void
6162

6263
$this->connection->ack($AMQPEnvelope);
6364
} catch (RejectMessageExceptionInterface $e) {
64-
$this->connection->reject($AMQPEnvelope);
65+
try {
66+
$this->connection->reject($AMQPEnvelope);
67+
} catch (\AMQPException $exception) {
68+
throw new TransportException($exception->getMessage(), 0, $exception);
69+
}
6570

6671
throw $e;
72+
} catch (\AMQPException $e) {
73+
throw new TransportException($e->getMessage(), 0, $e);
6774
} catch (\Throwable $e) {
68-
$this->connection->nack($AMQPEnvelope, AMQP_REQUEUE);
75+
try {
76+
$this->connection->nack($AMQPEnvelope, AMQP_REQUEUE);
77+
} catch (\AMQPException $exception) {
78+
throw new TransportException($exception->getMessage(), 0, $exception);
79+
}
6980

7081
throw $e;
7182
} finally {

‎src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php
+6-1Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespace Symfony\Component\Messenger\Transport\AmqpExt;
1313

1414
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Exception\TransportException;
1516
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
1617
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
1718
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -41,7 +42,11 @@ public function send(Envelope $envelope): Envelope
4142
{
4243
$encodedMessage = $this->serializer->encode($envelope);
4344

44-
$this->connection->publish($encodedMessage['body'], $encodedMessage['headers']);
45+
try {
46+
$this->connection->publish($encodedMessage['body'], $encodedMessage['headers']);
47+
} catch (\AMQPException $e) {
48+
throw new TransportException($e->getMessage(), 0, $e);
49+
}
4550

4651
return $envelope;
4752
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.