Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 36e91a7

Browse filesBrowse files
committed
Use AsyncAws to handle SQS communication
1 parent eda7aad commit 36e91a7
Copy full SHA for 36e91a7

File tree

6 files changed

+210
-286
lines changed
Filter options

6 files changed

+210
-286
lines changed

‎composer.json

Copy file name to clipboardExpand all lines: composer.json
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@
104104
"require-dev": {
105105
"amphp/http-client": "^4.2",
106106
"amphp/http-tunnel": "^1.0",
107+
"async-aws/sqs": "^0.3",
107108
"cache/integration-tests": "dev-master",
108109
"doctrine/annotations": "~1.0",
109110
"doctrine/cache": "~1.6",

‎src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/ConnectionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/ConnectionTest.php
+124-136Lines changed: 124 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,12 @@
1111

1212
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport;
1313

14+
use AsyncAws\Core\Exception\Http\HttpException;
15+
use AsyncAws\Sqs\SqsClient;
1416
use PHPUnit\Framework\TestCase;
17+
use Symfony\Component\HttpClient\MockHttpClient;
18+
use Symfony\Component\HttpClient\Response\MockResponse;
1519
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
16-
use Symfony\Component\Messenger\Exception\TransportException;
1720
use Symfony\Contracts\HttpClient\HttpClientInterface;
1821
use Symfony\Contracts\HttpClient\ResponseInterface;
1922

@@ -31,7 +34,7 @@ public function testFromDsn()
3134
{
3235
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
3336
$this->assertEquals(
34-
new Connection(['endpoint' => 'https://sqs.eu-west-1.amazonaws.com', 'queue_name' => 'queue'], $httpClient),
37+
new Connection(['queue_name' => 'queue'], new SqsClient(['region' => 'eu-west-1', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
3538
Connection::fromDsn('sqs://default/queue', [], $httpClient)
3639
);
3740
}
@@ -40,16 +43,16 @@ public function testFromDsnWithRegion()
4043
{
4144
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
4245
$this->assertEquals(
43-
new Connection(['endpoint' => 'https://sqs.us-east-1.amazonaws.com', 'queue_name' => 'queue', 'region' => 'us-east-1'], $httpClient),
44-
Connection::fromDsn('sqs://default/queue?region=us-east-1', [], $httpClient)
46+
new Connection(['queue_name' => 'queue'], new SqsClient(['region' => 'us-west-2', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
47+
Connection::fromDsn('sqs://default/queue?region=us-west-2', [], $httpClient)
4548
);
4649
}
4750

4851
public function testFromDsnWithCustomEndpoint()
4952
{
5053
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
5154
$this->assertEquals(
52-
new Connection(['endpoint' => 'https://localhost', 'queue_name' => 'queue'], $httpClient),
55+
new Connection(['queue_name' => 'queue'], new SqsClient(['region' => 'eu-west-1', 'endpoint' => 'https://localhost', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
5356
Connection::fromDsn('sqs://localhost/queue', [], $httpClient)
5457
);
5558
}
@@ -58,7 +61,7 @@ public function testFromDsnWithCustomEndpointAndPort()
5861
{
5962
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
6063
$this->assertEquals(
61-
new Connection(['endpoint' => 'https://localhost:1234', 'queue_name' => 'queue'], $httpClient),
64+
new Connection(['queue_name' => 'queue'], new SqsClient(['region' => 'eu-west-1', 'endpoint' => 'https://localhost:1234', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
6265
Connection::fromDsn('sqs://localhost:1234/queue', [], $httpClient)
6366
);
6467
}
@@ -67,7 +70,7 @@ public function testFromDsnWithOptions()
6770
{
6871
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
6972
$this->assertEquals(
70-
new Connection(['endpoint' => 'https://sqs.eu-west-1.amazonaws.com', 'account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], $httpClient),
73+
new Connection(['account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], new SqsClient(['region' => 'eu-west-1', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
7174
Connection::fromDsn('sqs://default/213/queue', ['buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], $httpClient)
7275
);
7376
}
@@ -76,126 +79,113 @@ public function testFromDsnWithQueryOptions()
7679
{
7780
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
7881
$this->assertEquals(
79-
new Connection(['endpoint' => 'https://sqs.eu-west-1.amazonaws.com', 'account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], $httpClient),
82+
new Connection(['account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], new SqsClient(['region' => 'eu-west-1', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
8083
Connection::fromDsn('sqs://default/213/queue?buffer_size=1&wait_time=5&auto_setup=0', [], $httpClient)
8184
);
8285
}
8386

84-
private function handleGetQueueUrl(int $index, $mock): string
85-
{
86-
$response = $this->getMockBuilder(ResponseInterface::class)->getMock();
87-
88-
$mock->expects($this->at($index))->method('request')
89-
->with('POST', 'https://localhost', ['body' => ['Action' => 'GetQueueUrl', 'QueueName' => 'queue']])
90-
->willReturn($response);
91-
$response->expects($this->once())->method('getStatusCode')->willReturn(200);
92-
$response->expects($this->once())->method('getContent')->willReturn('<GetQueueUrlResponse>
93-
<GetQueueUrlResult>
94-
<QueueUrl>https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue</QueueUrl>
95-
</GetQueueUrlResult>
96-
<ResponseMetadata>
97-
<RequestId>470a6f13-2ed9-4181-ad8a-2fdea142988e</RequestId>
98-
</ResponseMetadata>
99-
</GetQueueUrlResponse>');
100-
101-
return 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue';
102-
}
103-
10487
public function testKeepGettingPendingMessages()
10588
{
106-
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
107-
$response = $this->getMockBuilder(ResponseInterface::class)->getMock();
108-
109-
$queueUrl = $this->handleGetQueueUrl(0, $httpClient);
110-
111-
$httpClient->expects($this->at(1))->method('request')
112-
->with('POST', $queueUrl, ['body' => ['Action' => 'ReceiveMessage', 'VisibilityTimeout' => null, 'MaxNumberOfMessages' => 9, 'WaitTimeSeconds' => 20]])
113-
->willReturn($response);
114-
$response->expects($this->once())->method('getContent')->willReturn('<ReceiveMessageResponse>
115-
<ReceiveMessageResult>
116-
<Message>
117-
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
118-
<ReceiptHandle>
119-
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
120-
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
121-
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
122-
</ReceiptHandle>
123-
<MD5OfBody>fafb00f5732ab283681e124bf8747ed1</MD5OfBody>
124-
<Body>{"body":"this is a test","headers":{}}</Body>
125-
<Attribute>
126-
<Name>SenderId</Name>
127-
<Value>195004372649</Value>
128-
</Attribute>
129-
<Attribute>
130-
<Name>SentTimestamp</Name>
131-
<Value>1238099229000</Value>
132-
</Attribute>
133-
<Attribute>
134-
<Name>ApproximateReceiveCount</Name>
135-
<Value>5</Value>
136-
</Attribute>
137-
<Attribute>
138-
<Name>ApproximateFirstReceiveTimestamp</Name>
139-
<Value>1250700979248</Value>
140-
</Attribute>
141-
</Message>
142-
<Message>
143-
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
144-
<ReceiptHandle>
145-
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
146-
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
147-
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
148-
</ReceiptHandle>
149-
<MD5OfBody>fafb00f5732ab283681e124bf8747ed1</MD5OfBody>
150-
<Body>{"body":"this is a test","headers":{}}</Body>
151-
<Attribute>
152-
<Name>SenderId</Name>
153-
<Value>195004372649</Value>
154-
</Attribute>
155-
<Attribute>
156-
<Name>SentTimestamp</Name>
157-
<Value>1238099229000</Value>
158-
</Attribute>
159-
<Attribute>
160-
<Name>ApproximateReceiveCount</Name>
161-
<Value>5</Value>
162-
</Attribute>
163-
<Attribute>
164-
<Name>ApproximateFirstReceiveTimestamp</Name>
165-
<Value>1250700979248</Value>
166-
</Attribute>
167-
</Message>
168-
<Message>
169-
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
170-
<ReceiptHandle>
171-
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
172-
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
173-
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
174-
</ReceiptHandle>
175-
<MD5OfBody>fafb00f5732ab283681e124bf8747ed1</MD5OfBody>
176-
<Body>{"body":"this is a test","headers":{}}</Body>
177-
<Attribute>
178-
<Name>SenderId</Name>
179-
<Value>195004372649</Value>
180-
</Attribute>
181-
<Attribute>
182-
<Name>SentTimestamp</Name>
183-
<Value>1238099229000</Value>
184-
</Attribute>
185-
<Attribute>
186-
<Name>ApproximateReceiveCount</Name>
187-
<Value>5</Value>
188-
</Attribute>
189-
<Attribute>
190-
<Name>ApproximateFirstReceiveTimestamp</Name>
191-
<Value>1250700979248</Value>
192-
</Attribute>
193-
</Message>
194-
</ReceiveMessageResult>
195-
<ResponseMetadata>
196-
<RequestId>b6633655-283d-45b4-aee4-4e84e0ae6afa</RequestId>
197-
</ResponseMetadata>
198-
</ReceiveMessageResponse>');
89+
$httpClient = new MockHttpClient(function(string $method, string $url, array $options): ResponseInterface {
90+
if ($options['body'] === 'Action=GetQueueUrl&Version=2012-11-05&QueueName=queue') {
91+
return new MockResponse('<GetQueueUrlResponse>
92+
<GetQueueUrlResult>
93+
<QueueUrl>https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue</QueueUrl>
94+
</GetQueueUrlResult>
95+
<ResponseMetadata>
96+
<RequestId>470a6f13-2ed9-4181-ad8a-2fdea142988e</RequestId>
97+
</ResponseMetadata>
98+
</GetQueueUrlResponse>');
99+
}
100+
if ($options['body'] === 'Action=ReceiveMessage&Version=2012-11-05&QueueUrl=https%3A%2F%2Fsqs.us-east-2.amazonaws.com%2F123456789012%2FMyQueue&MaxNumberOfMessages=9&WaitTimeSeconds=20') {
101+
return new MockResponse('<ReceiveMessageResponse>
102+
<ReceiveMessageResult>
103+
<Message>
104+
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
105+
<ReceiptHandle>
106+
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
107+
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
108+
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
109+
</ReceiptHandle>
110+
<MD5OfBody>fafb00f5732ab283681e124bf8747ed1</MD5OfBody>
111+
<Body>{"body":"this is a test","headers":{}}</Body>
112+
<Attribute>
113+
<Name>SenderId</Name>
114+
<Value>195004372649</Value>
115+
</Attribute>
116+
<Attribute>
117+
<Name>SentTimestamp</Name>
118+
<Value>1238099229000</Value>
119+
</Attribute>
120+
<Attribute>
121+
<Name>ApproximateReceiveCount</Name>
122+
<Value>5</Value>
123+
</Attribute>
124+
<Attribute>
125+
<Name>ApproximateFirstReceiveTimestamp</Name>
126+
<Value>1250700979248</Value>
127+
</Attribute>
128+
</Message>
129+
<Message>
130+
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
131+
<ReceiptHandle>
132+
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
133+
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
134+
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
135+
</ReceiptHandle>
136+
<MD5OfBody>fafb00f5732ab283681e124bf8747ed1</MD5OfBody>
137+
<Body>{"body":"this is a test","headers":{}}</Body>
138+
<Attribute>
139+
<Name>SenderId</Name>
140+
<Value>195004372649</Value>
141+
</Attribute>
142+
<Attribute>
143+
<Name>SentTimestamp</Name>
144+
<Value>1238099229000</Value>
145+
</Attribute>
146+
<Attribute>
147+
<Name>ApproximateReceiveCount</Name>
148+
<Value>5</Value>
149+
</Attribute>
150+
<Attribute>
151+
<Name>ApproximateFirstReceiveTimestamp</Name>
152+
<Value>1250700979248</Value>
153+
</Attribute>
154+
</Message>
155+
<Message>
156+
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
157+
<ReceiptHandle>
158+
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
159+
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
160+
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
161+
</ReceiptHandle>
162+
<MD5OfBody>fafb00f5732ab283681e124bf8747ed1</MD5OfBody>
163+
<Body>{"body":"this is a test","headers":{}}</Body>
164+
<Attribute>
165+
<Name>SenderId</Name>
166+
<Value>195004372649</Value>
167+
</Attribute>
168+
<Attribute>
169+
<Name>SentTimestamp</Name>
170+
<Value>1238099229000</Value>
171+
</Attribute>
172+
<Attribute>
173+
<Name>ApproximateReceiveCount</Name>
174+
<Value>5</Value>
175+
</Attribute>
176+
<Attribute>
177+
<Name>ApproximateFirstReceiveTimestamp</Name>
178+
<Value>1250700979248</Value>
179+
</Attribute>
180+
</Message>
181+
</ReceiveMessageResult>
182+
<ResponseMetadata>
183+
<RequestId>b6633655-283d-45b4-aee4-4e84e0ae6afa</RequestId>
184+
</ResponseMetadata>
185+
</ReceiveMessageResponse>');
186+
}
187+
$this->fail('Unexpected HTTP call');
188+
});
199189

200190
$connection = Connection::fromDsn('sqs://localhost/queue', ['auto_setup' => false], $httpClient);
201191
$this->assertNotNull($connection->get());
@@ -205,23 +195,21 @@ public function testKeepGettingPendingMessages()
205195

206196
public function testUnexpectedSqsError()
207197
{
208-
$this->expectException(TransportException::class);
198+
$this->expectException(HttpException::class);
209199
$this->expectExceptionMessage('SQS error happens');
210200

211-
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
212-
$response = $this->getMockBuilder(ResponseInterface::class)->getMock();
213-
214-
$httpClient->expects($this->once())->method('request')->willReturn($response);
215-
$response->expects($this->once())->method('getStatusCode')->willReturn(400);
216-
$response->expects($this->once())->method('getContent')->willReturn('<ErrorResponse xmlns="http://queue.amazonaws.com/doc/2012-11-05/">
217-
<Error>
218-
<Type>Sender</Type>
219-
<Code>boom</Code>
220-
<Message>SQS error happens</Message>
221-
<Detail/>
222-
</Error>
223-
<RequestId>30441e49-5246-5231-9c87-4bd704b81ce9</RequestId>
224-
</ErrorResponse>');
201+
$httpClient = new MockHttpClient(function(string $method, string $url, array $options): ResponseInterface {
202+
return new MockResponse('<ErrorResponse xmlns="http://queue.amazonaws.com/doc/2012-11-05/">
203+
<Error>
204+
<Type>Sender</Type>
205+
<Code>boom</Code>
206+
<Message>SQS error happens</Message>
207+
<Detail/>
208+
</Error>
209+
<RequestId>30441e49-5246-5231-9c87-4bd704b81ce9</RequestId>
210+
</ErrorResponse>', ['http_code'=>400]);
211+
});
212+
225213
$connection = Connection::fromDsn('sqs://localhost/queue', [], $httpClient);
226214
$connection->get();
227215
}

‎src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsReceiver.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsReceiver.php
+5-5Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;
1313

14+
use AsyncAws\Core\Exception\Http\HttpException;
1415
use Symfony\Component\Messenger\Envelope;
1516
use Symfony\Component\Messenger\Exception\LogicException;
1617
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
@@ -19,7 +20,6 @@
1920
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
2021
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
2122
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
22-
use Symfony\Contracts\HttpClient\Exception\HttpExceptionInterface;
2323

2424
/**
2525
* @author Jérémy Derussé <jeremy@derusse.com>
@@ -42,7 +42,7 @@ public function get(): iterable
4242
{
4343
try {
4444
$sqsEnvelope = $this->connection->get();
45-
} catch (HttpExceptionInterface $e) {
45+
} catch (HttpException $e) {
4646
throw new TransportException($e->getMessage(), 0, $e);
4747
}
4848
if (null === $sqsEnvelope) {
@@ -70,7 +70,7 @@ public function ack(Envelope $envelope): void
7070
{
7171
try {
7272
$this->connection->delete($this->findSqsReceivedStamp($envelope)->getId());
73-
} catch (HttpExceptionInterface $e) {
73+
} catch (HttpException $e) {
7474
throw new TransportException($e->getMessage(), 0, $e);
7575
}
7676
}
@@ -82,7 +82,7 @@ public function reject(Envelope $envelope): void
8282
{
8383
try {
8484
$this->connection->delete($this->findSqsReceivedStamp($envelope)->getId());
85-
} catch (HttpExceptionInterface $e) {
85+
} catch (HttpException $e) {
8686
throw new TransportException($e->getMessage(), 0, $e);
8787
}
8888
}
@@ -94,7 +94,7 @@ public function getMessageCount(): int
9494
{
9595
try {
9696
return $this->connection->getMessageCount();
97-
} catch (HttpExceptionInterface $e) {
97+
} catch (HttpException $e) {
9898
throw new TransportException($e->getMessage(), 0, $e);
9999
}
100100
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.