Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 017420b

Browse filesBrowse files
committed
feature #36094 [AmazonSqsMessenger] Use AsyncAws to handle SQS communication (jderusse)
This PR was squashed before being merged into the 5.1-dev branch. Discussion ---------- [AmazonSqsMessenger] Use AsyncAws to handle SQS communication | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | Deprecations? | no | Tickets | / | License | MIT | Doc PR | / Similar to #35992 this PR use AsyncAws to handle Sqs messages sent/receive It move complexity of authentication/streaming outside Symfony while keeping HttpClient integration. Commits ------- 7c4888e [AmazonSqsMessenger] Use AsyncAws to handle SQS communication
2 parents 87a5701 + 7c4888e commit 017420b
Copy full SHA for 017420b

File tree

8 files changed

+174
-317
lines changed
Filter options

8 files changed

+174
-317
lines changed

‎composer.json

Copy file name to clipboardExpand all lines: composer.json
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@
105105
"amphp/http-client": "^4.2",
106106
"amphp/http-tunnel": "^1.0",
107107
"async-aws/ses": "^1.0",
108+
"async-aws/sqs": "^1.0",
108109
"cache/integration-tests": "dev-master",
109110
"doctrine/annotations": "~1.0",
110111
"doctrine/cache": "~1.6",

‎src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsIntegrationTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsIntegrationTest.php
+8-10Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport;
1313

14+
use AsyncAws\Sqs\SqsClient;
1415
use PHPUnit\Framework\TestCase;
1516
use Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures\DummyMessage;
1617
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
@@ -39,7 +40,7 @@ private function execute(string $dsn): void
3940
{
4041
$connection = Connection::fromDsn($dsn, []);
4142
$connection->setup();
42-
$this->clearSqs($connection);
43+
$this->clearSqs($dsn);
4344

4445
$connection->send('{"message": "Hi"}', ['type' => DummyMessage::class]);
4546
$this->assertSame(1, $connection->getMessageCount());
@@ -53,15 +54,12 @@ private function execute(string $dsn): void
5354
$this->assertEquals(['type' => DummyMessage::class], $encoded['headers']);
5455
}
5556

56-
private function clearSqs(Connection $connection): void
57+
private function clearSqs(string $dsn): void
5758
{
58-
$wait = 0;
59-
while ($wait++ < 50) {
60-
if (null === $message = $connection->get()) {
61-
usleep(5000);
62-
continue;
63-
}
64-
$connection->delete($message['id']);
65-
}
59+
$url = parse_url($dsn);
60+
$client = new SqsClient(['endpoint' => "http://{$url['host']}:{$url['port']}"]);
61+
$client->purgeQueue([
62+
'QueueUrl' => $client->getQueueUrl(['QueueName' => ltrim($url['path'], '/')])->getQueueUrl(),
63+
]);
6664
}
6765
}

‎src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/ConnectionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/ConnectionTest.php
+54-140Lines changed: 54 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,15 @@
1111

1212
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Transport;
1313

14+
use AsyncAws\Core\Exception\Http\HttpException;
15+
use AsyncAws\Core\Test\ResultMockFactory;
16+
use AsyncAws\Sqs\Result\GetQueueUrlResult;
17+
use AsyncAws\Sqs\Result\ReceiveMessageResult;
18+
use AsyncAws\Sqs\SqsClient;
19+
use AsyncAws\Sqs\ValueObject\Message;
1420
use PHPUnit\Framework\TestCase;
1521
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
16-
use Symfony\Component\Messenger\Exception\TransportException;
1722
use Symfony\Contracts\HttpClient\HttpClientInterface;
18-
use Symfony\Contracts\HttpClient\ResponseInterface;
1923

2024
class ConnectionTest extends TestCase
2125
{
@@ -31,7 +35,7 @@ public function testFromDsn()
3135
{
3236
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
3337
$this->assertEquals(
34-
new Connection(['endpoint' => 'https://sqs.eu-west-1.amazonaws.com', 'queue_name' => 'queue'], $httpClient),
38+
new Connection(['queue_name' => 'queue'], new SqsClient(['region' => 'eu-west-1', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
3539
Connection::fromDsn('sqs://default/queue', [], $httpClient)
3640
);
3741
}
@@ -40,16 +44,16 @@ public function testFromDsnWithRegion()
4044
{
4145
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
4246
$this->assertEquals(
43-
new Connection(['endpoint' => 'https://sqs.us-east-1.amazonaws.com', 'queue_name' => 'queue', 'region' => 'us-east-1'], $httpClient),
44-
Connection::fromDsn('sqs://default/queue?region=us-east-1', [], $httpClient)
47+
new Connection(['queue_name' => 'queue'], new SqsClient(['region' => 'us-west-2', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
48+
Connection::fromDsn('sqs://default/queue?region=us-west-2', [], $httpClient)
4549
);
4650
}
4751

4852
public function testFromDsnWithCustomEndpoint()
4953
{
5054
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
5155
$this->assertEquals(
52-
new Connection(['endpoint' => 'https://localhost', 'queue_name' => 'queue'], $httpClient),
56+
new Connection(['queue_name' => 'queue'], new SqsClient(['region' => 'eu-west-1', 'endpoint' => 'https://localhost', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
5357
Connection::fromDsn('sqs://localhost/queue', [], $httpClient)
5458
);
5559
}
@@ -58,7 +62,7 @@ public function testFromDsnWithCustomEndpointAndPort()
5862
{
5963
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
6064
$this->assertEquals(
61-
new Connection(['endpoint' => 'https://localhost:1234', 'queue_name' => 'queue'], $httpClient),
65+
new Connection(['queue_name' => 'queue'], new SqsClient(['region' => 'eu-west-1', 'endpoint' => 'https://localhost:1234', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
6266
Connection::fromDsn('sqs://localhost:1234/queue', [], $httpClient)
6367
);
6468
}
@@ -67,7 +71,7 @@ public function testFromDsnWithOptions()
6771
{
6872
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
6973
$this->assertEquals(
70-
new Connection(['endpoint' => 'https://sqs.eu-west-1.amazonaws.com', 'account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], $httpClient),
74+
new Connection(['account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], new SqsClient(['region' => 'eu-west-1', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
7175
Connection::fromDsn('sqs://default/213/queue', ['buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], $httpClient)
7276
);
7377
}
@@ -76,153 +80,63 @@ public function testFromDsnWithQueryOptions()
7680
{
7781
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
7882
$this->assertEquals(
79-
new Connection(['endpoint' => 'https://sqs.eu-west-1.amazonaws.com', 'account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], $httpClient),
83+
new Connection(['account' => '213', 'queue_name' => 'queue', 'buffer_size' => 1, 'wait_time' => 5, 'auto_setup' => false], new SqsClient(['region' => 'eu-west-1', 'accessKeyId' => null, 'accessKeySecret' => null], null, $httpClient)),
8084
Connection::fromDsn('sqs://default/213/queue?buffer_size=1&wait_time=5&auto_setup=0', [], $httpClient)
8185
);
8286
}
8387

84-
private function handleGetQueueUrl(int $index, $mock): string
85-
{
86-
$response = $this->getMockBuilder(ResponseInterface::class)->getMock();
87-
88-
$mock->expects($this->at($index))->method('request')
89-
->with('POST', 'https://localhost', ['body' => ['Action' => 'GetQueueUrl', 'QueueName' => 'queue']])
90-
->willReturn($response);
91-
$response->expects($this->once())->method('getStatusCode')->willReturn(200);
92-
$response->expects($this->once())->method('getContent')->willReturn('<GetQueueUrlResponse>
93-
<GetQueueUrlResult>
94-
<QueueUrl>https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue</QueueUrl>
95-
</GetQueueUrlResult>
96-
<ResponseMetadata>
97-
<RequestId>470a6f13-2ed9-4181-ad8a-2fdea142988e</RequestId>
98-
</ResponseMetadata>
99-
</GetQueueUrlResponse>');
100-
101-
return 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue';
102-
}
103-
10488
public function testKeepGettingPendingMessages()
10589
{
106-
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
107-
$response = $this->getMockBuilder(ResponseInterface::class)->getMock();
108-
109-
$queueUrl = $this->handleGetQueueUrl(0, $httpClient);
110-
111-
$httpClient->expects($this->at(1))->method('request')
112-
->with('POST', $queueUrl, ['body' => ['Action' => 'ReceiveMessage', 'VisibilityTimeout' => null, 'MaxNumberOfMessages' => 9, 'WaitTimeSeconds' => 20, 'MessageAttributeName.1' => 'All']])
113-
->willReturn($response);
114-
$response->expects($this->once())->method('getContent')->willReturn('<ReceiveMessageResponse>
115-
<ReceiveMessageResult>
116-
<Message>
117-
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
118-
<ReceiptHandle>
119-
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
120-
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
121-
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
122-
</ReceiptHandle>
123-
<MD5OfBody>fafb00f5732ab283681e124bf8747ed1</MD5OfBody>
124-
<Body>{"body":"this is a test","headers":{}}</Body>
125-
<Attribute>
126-
<Name>SenderId</Name>
127-
<Value>195004372649</Value>
128-
</Attribute>
129-
<Attribute>
130-
<Name>SentTimestamp</Name>
131-
<Value>1238099229000</Value>
132-
</Attribute>
133-
<Attribute>
134-
<Name>ApproximateReceiveCount</Name>
135-
<Value>5</Value>
136-
</Attribute>
137-
<Attribute>
138-
<Name>ApproximateFirstReceiveTimestamp</Name>
139-
<Value>1250700979248</Value>
140-
</Attribute>
141-
</Message>
142-
<Message>
143-
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
144-
<ReceiptHandle>
145-
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
146-
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
147-
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
148-
</ReceiptHandle>
149-
<MD5OfBody>fafb00f5732ab283681e124bf8747ed1</MD5OfBody>
150-
<Body>{"body":"this is a test","headers":{}}</Body>
151-
<Attribute>
152-
<Name>SenderId</Name>
153-
<Value>195004372649</Value>
154-
</Attribute>
155-
<Attribute>
156-
<Name>SentTimestamp</Name>
157-
<Value>1238099229000</Value>
158-
</Attribute>
159-
<Attribute>
160-
<Name>ApproximateReceiveCount</Name>
161-
<Value>5</Value>
162-
</Attribute>
163-
<Attribute>
164-
<Name>ApproximateFirstReceiveTimestamp</Name>
165-
<Value>1250700979248</Value>
166-
</Attribute>
167-
</Message>
168-
<Message>
169-
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
170-
<ReceiptHandle>
171-
MbZj6wDWli+JvwwJaBV+3dcjk2YW2vA3+STFFljTM8tJJg6HRG6PYSasuWXPJB+Cw
172-
Lj1FjgXUv1uSj1gUPAWV66FU/WeR4mq2OKpEGYWbnLmpRCJVAyeMjeU5ZBdtcQ+QE
173-
auMZc8ZRv37sIW2iJKq3M9MFx1YvV11A2x/KSbkJ0=
174-
</ReceiptHandle>
175-
<MD5OfBody>fafb00f5732ab283681e124bf8747ed1</MD5OfBody>
176-
<Body>{"body":"this is a test","headers":{}}</Body>
177-
<Attribute>
178-
<Name>SenderId</Name>
179-
<Value>195004372649</Value>
180-
</Attribute>
181-
<Attribute>
182-
<Name>SentTimestamp</Name>
183-
<Value>1238099229000</Value>
184-
</Attribute>
185-
<Attribute>
186-
<Name>ApproximateReceiveCount</Name>
187-
<Value>5</Value>
188-
</Attribute>
189-
<Attribute>
190-
<Name>ApproximateFirstReceiveTimestamp</Name>
191-
<Value>1250700979248</Value>
192-
</Attribute>
193-
</Message>
194-
</ReceiveMessageResult>
195-
<ResponseMetadata>
196-
<RequestId>b6633655-283d-45b4-aee4-4e84e0ae6afa</RequestId>
197-
</ResponseMetadata>
198-
</ReceiveMessageResponse>');
199-
200-
$connection = Connection::fromDsn('sqs://localhost/queue', ['auto_setup' => false], $httpClient);
90+
$client = $this->createMock(SqsClient::class);
91+
$client->expects($this->any())
92+
->method('getQueueUrl')
93+
->with(['QueueName' => 'queue', 'QueueOwnerAWSAccountId' => 123])
94+
->willReturn(ResultMockFactory::create(GetQueueUrlResult::class, ['QueueUrl' => 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue']));
95+
$client->expects($this->at(1))
96+
->method('receiveMessage')
97+
->with([
98+
'QueueUrl' => 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue',
99+
'MaxNumberOfMessages' => 9,
100+
'WaitTimeSeconds' => 20,
101+
'MessageAttributeNames' => ['All'],
102+
'VisibilityTimeout' => null,
103+
])
104+
->willReturn(ResultMockFactory::create(ReceiveMessageResult::class, ['Messages' => [
105+
new Message(['MessageId' => 1, 'Body' => 'this is a test']),
106+
new Message(['MessageId' => 2, 'Body' => 'this is a test']),
107+
new Message(['MessageId' => 3, 'Body' => 'this is a test']),
108+
]]));
109+
$client->expects($this->at(2))
110+
->method('receiveMessage')
111+
->with([
112+
'QueueUrl' => 'https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue',
113+
'MaxNumberOfMessages' => 9,
114+
'WaitTimeSeconds' => 20,
115+
'MessageAttributeNames' => ['All'],
116+
'VisibilityTimeout' => null,
117+
])
118+
->willReturn(ResultMockFactory::create(ReceiveMessageResult::class, ['Messages' => [
119+
]]));
120+
121+
$connection = new Connection(['queue_name' => 'queue', 'account' => 123, 'auto_setup' => false], $client);
201122
$this->assertNotNull($connection->get());
202123
$this->assertNotNull($connection->get());
203124
$this->assertNotNull($connection->get());
125+
$this->assertNull($connection->get());
204126
}
205127

206128
public function testUnexpectedSqsError()
207129
{
208-
$this->expectException(TransportException::class);
130+
$this->expectException(HttpException::class);
209131
$this->expectExceptionMessage('SQS error happens');
210132

211-
$httpClient = $this->getMockBuilder(HttpClientInterface::class)->getMock();
212-
$response = $this->getMockBuilder(ResponseInterface::class)->getMock();
213-
214-
$httpClient->expects($this->once())->method('request')->willReturn($response);
215-
$response->expects($this->once())->method('getStatusCode')->willReturn(400);
216-
$response->expects($this->once())->method('getContent')->willReturn('<ErrorResponse xmlns="http://queue.amazonaws.com/doc/2012-11-05/">
217-
<Error>
218-
<Type>Sender</Type>
219-
<Code>boom</Code>
220-
<Message>SQS error happens</Message>
221-
<Detail/>
222-
</Error>
223-
<RequestId>30441e49-5246-5231-9c87-4bd704b81ce9</RequestId>
224-
</ErrorResponse>');
225-
$connection = Connection::fromDsn('sqs://localhost/queue', [], $httpClient);
133+
$client = $this->createMock(SqsClient::class);
134+
$client->expects($this->any())
135+
->method('getQueueUrl')
136+
->with(['QueueName' => 'queue', 'QueueOwnerAWSAccountId' => 123])
137+
->willReturn(ResultMockFactory::createFailing(GetQueueUrlResult::class, 400, 'SQS error happens'));
138+
139+
$connection = new Connection(['queue_name' => 'queue', 'account' => 123, 'auto_setup' => false], $client);
226140
$connection->get();
227141
}
228142
}

‎src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsReceiver.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsReceiver.php
+5-5Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;
1313

14+
use AsyncAws\Core\Exception\Http\HttpException;
1415
use Symfony\Component\Messenger\Envelope;
1516
use Symfony\Component\Messenger\Exception\LogicException;
1617
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
@@ -19,7 +20,6 @@
1920
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
2021
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
2122
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
22-
use Symfony\Contracts\HttpClient\Exception\HttpExceptionInterface;
2323

2424
/**
2525
* @author Jérémy Derussé <jeremy@derusse.com>
@@ -42,7 +42,7 @@ public function get(): iterable
4242
{
4343
try {
4444
$sqsEnvelope = $this->connection->get();
45-
} catch (HttpExceptionInterface $e) {
45+
} catch (HttpException $e) {
4646
throw new TransportException($e->getMessage(), 0, $e);
4747
}
4848
if (null === $sqsEnvelope) {
@@ -70,7 +70,7 @@ public function ack(Envelope $envelope): void
7070
{
7171
try {
7272
$this->connection->delete($this->findSqsReceivedStamp($envelope)->getId());
73-
} catch (HttpExceptionInterface $e) {
73+
} catch (HttpException $e) {
7474
throw new TransportException($e->getMessage(), 0, $e);
7575
}
7676
}
@@ -82,7 +82,7 @@ public function reject(Envelope $envelope): void
8282
{
8383
try {
8484
$this->connection->delete($this->findSqsReceivedStamp($envelope)->getId());
85-
} catch (HttpExceptionInterface $e) {
85+
} catch (HttpException $e) {
8686
throw new TransportException($e->getMessage(), 0, $e);
8787
}
8888
}
@@ -94,7 +94,7 @@ public function getMessageCount(): int
9494
{
9595
try {
9696
return $this->connection->getMessageCount();
97-
} catch (HttpExceptionInterface $e) {
97+
} catch (HttpException $e) {
9898
throw new TransportException($e->getMessage(), 0, $e);
9999
}
100100
}

‎src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsSender.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsSender.php
+2-2Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,12 @@
1111

1212
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;
1313

14+
use AsyncAws\Core\Exception\Http\HttpException;
1415
use Symfony\Component\Messenger\Envelope;
1516
use Symfony\Component\Messenger\Exception\TransportException;
1617
use Symfony\Component\Messenger\Stamp\DelayStamp;
1718
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
1819
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
19-
use Symfony\Contracts\HttpClient\Exception\HttpExceptionInterface;
2020

2121
/**
2222
* @author Jérémy Derussé <jeremy@derusse.com>
@@ -61,7 +61,7 @@ public function send(Envelope $envelope): Envelope
6161
$messageGroupId,
6262
$messageDeduplicationId
6363
);
64-
} catch (HttpExceptionInterface $e) {
64+
} catch (HttpException $e) {
6565
throw new TransportException($e->getMessage(), 0, $e);
6666
}
6767

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.