Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 62a08ee

Browse filesBrowse files
committed
Updated exception message in AmqpSender, updated AmqpReceiver to throw new TransportException
1 parent b2b0640 commit 62a08ee
Copy full SHA for 62a08ee

File tree

3 files changed

+91
-3
lines changed
Filter options

3 files changed

+91
-3
lines changed

‎src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpReceiverTest.php
+77Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,83 @@ public function testItRejectsTheMessageIfTheExceptionIsARejectMessageExceptionIn
101101
throw new WillNeverWorkException('Well...');
102102
});
103103
}
104+
105+
/**
106+
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
107+
*/
108+
public function testItThrowsATransportExceptionIfItCannotAcknowledgeMessage()
109+
{
110+
$serializer = new Serializer(
111+
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
112+
);
113+
114+
$envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
115+
$envelope->method('getBody')->willReturn('{"message": "Hi"}');
116+
$envelope->method('getHeaders')->willReturn([
117+
'type' => DummyMessage::class,
118+
]);
119+
120+
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
121+
$connection->method('get')->willReturn($envelope);
122+
123+
$connection->method('ack')->with($envelope)->willThrowException(new \AMQPException());
124+
125+
$receiver = new AmqpReceiver($connection, $serializer);
126+
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
127+
$receiver->stop();
128+
});
129+
}
130+
131+
/**
132+
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
133+
*/
134+
public function testItThrowsATransportExceptionIfItCannotRejectMessage()
135+
{
136+
$serializer = new Serializer(
137+
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
138+
);
139+
140+
$envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
141+
$envelope->method('getBody')->willReturn('{"message": "Hi"}');
142+
$envelope->method('getHeaders')->willReturn([
143+
'type' => DummyMessage::class,
144+
]);
145+
146+
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
147+
$connection->method('get')->willReturn($envelope);
148+
$connection->method('reject')->with($envelope)->willThrowException(new \AMQPException());
149+
150+
$receiver = new AmqpReceiver($connection, $serializer);
151+
$receiver->receive(function () {
152+
throw new WillNeverWorkException('Well...');
153+
});
154+
}
155+
156+
/**
157+
* @expectedException \Symfony\Component\Messenger\Exception\TransportException
158+
*/
159+
public function testItThrowsATransportExceptionIfItCannotNonAcknowledgeMessage()
160+
{
161+
$serializer = new Serializer(
162+
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
163+
);
164+
165+
$envelope = $this->getMockBuilder(\AMQPEnvelope::class)->getMock();
166+
$envelope->method('getBody')->willReturn('{"message": "Hi"}');
167+
$envelope->method('getHeaders')->willReturn([
168+
'type' => DummyMessage::class,
169+
]);
170+
171+
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
172+
$connection->method('get')->willReturn($envelope);
173+
174+
$connection->method('nack')->with($envelope)->willThrowException(new \AMQPException());
175+
176+
$receiver = new AmqpReceiver($connection, $serializer);
177+
$receiver->receive(function () {
178+
throw new InterruptException('Well...');
179+
});
180+
}
104181
}
105182

106183
class InterruptException extends \Exception

‎src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php
+13-2Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
namespace Symfony\Component\Messenger\Transport\AmqpExt;
1313

14+
use Symfony\Component\Messenger\Exception\TransportException;
1415
use Symfony\Component\Messenger\Transport\AmqpExt\Exception\RejectMessageExceptionInterface;
1516
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
1617
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
@@ -61,11 +62,21 @@ public function receive(callable $handler): void
6162

6263
$this->connection->ack($AMQPEnvelope);
6364
} catch (RejectMessageExceptionInterface $e) {
64-
$this->connection->reject($AMQPEnvelope);
65+
try {
66+
$this->connection->reject($AMQPEnvelope);
67+
} catch (\AMQPException $exception) {
68+
throw new TransportException($exception->getMessage(), 0, $exception);
69+
}
6570

6671
throw $e;
72+
} catch (\AMQPException $e) {
73+
throw new TransportException($e->getMessage(), 0, $e);
6774
} catch (\Throwable $e) {
68-
$this->connection->nack($AMQPEnvelope, AMQP_REQUEUE);
75+
try {
76+
$this->connection->nack($AMQPEnvelope, AMQP_REQUEUE);
77+
} catch (\AMQPException $exception) {
78+
throw new TransportException($exception->getMessage(), 0, $exception);
79+
}
6980

7081
throw $e;
7182
} finally {

‎src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpSender.php
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public function send(Envelope $envelope): Envelope
4545
try {
4646
$this->connection->publish($encodedMessage['body'], $encodedMessage['headers']);
4747
} catch (\AMQPException $e) {
48-
throw new TransportException('Current transport was not able to send given message, please try again', 0, $e);
48+
throw new TransportException($e->getMessage(), 0, $e);
4949
}
5050

5151
return $envelope;

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.