Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 091c35b

Browse filesBrowse files
committed
Improve SQS interoperability
1 parent 829566c commit 091c35b
Copy full SHA for 091c35b

File tree

2 files changed

+22
-6
lines changed
Filter options

2 files changed

+22
-6
lines changed

‎src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/ConnectionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/ConnectionTest.php
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public function testKeepGettingPendingMessages()
109109
$queueUrl = $this->handleGetQueueUrl(0, $httpClient);
110110

111111
$httpClient->expects($this->at(1))->method('request')
112-
->with('POST', $queueUrl, ['body' => ['Action' => 'ReceiveMessage', 'VisibilityTimeout' => null, 'MaxNumberOfMessages' => 9, 'WaitTimeSeconds' => 20]])
112+
->with('POST', $queueUrl, ['body' => ['Action' => 'ReceiveMessage', 'VisibilityTimeout' => null, 'MaxNumberOfMessages' => 9, 'WaitTimeSeconds' => 20, 'MessageAttributeName.1' => 'All']])
113113
->willReturn($response);
114114
$response->expects($this->once())->method('getContent')->willReturn('<ReceiveMessageResponse>
115115
<ReceiveMessageResult>

‎src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php
+21-5Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ private function getNewMessages(): \Generator
176176
'Action' => 'ReceiveMessage',
177177
'VisibilityTimeout' => $this->configuration['visibility_timeout'],
178178
'MaxNumberOfMessages' => $this->configuration['buffer_size'],
179+
'MessageAttributeName.1' => 'All',
179180
'WaitTimeSeconds' => $this->configuration['wait_time'],
180181
]);
181182
}
@@ -186,9 +187,18 @@ private function getNewMessages(): \Generator
186187

187188
$xml = new \SimpleXMLElement($this->currentResponse->getContent());
188189
foreach ($xml->ReceiveMessageResult->Message as $xmlMessage) {
190+
$headers = [];
191+
foreach ($xmlMessage->MessageAttribute as $item) {
192+
if ('String' !== (string) $item->Value->DataType) {
193+
continue;
194+
}
195+
$headers[(string) $item->Name] = (string) $item->Value->StringValue;
196+
}
189197
$this->buffer[] = [
190198
'id' => (string) $xmlMessage->ReceiptHandle,
191-
] + json_decode($xmlMessage->Body, true);
199+
'body' => (string) $xmlMessage->Body,
200+
'headers' => $headers,
201+
];
192202
}
193203

194204
$this->currentResponse = null;
@@ -246,17 +256,23 @@ public function send(string $body, array $headers, int $delay = 0, ?string $mess
246256
$this->setup();
247257
}
248258

249-
$messageBody = json_encode(['body' => $body, 'headers' => $headers]);
250-
251259
$parameters = [
252260
'Action' => 'SendMessage',
253-
'MessageBody' => $messageBody,
261+
'MessageBody' => $body,
254262
'DelaySeconds' => $delay,
255263
];
256264

265+
$index = 0;
266+
foreach ($headers as $name => $value) {
267+
++$index;
268+
$parameters["MessageAttribute.$index.Name"] = $name;
269+
$parameters["MessageAttribute.$index.Value.DataType"] = 'String';
270+
$parameters["MessageAttribute.$index.Value.StringValue"] = $value;
271+
}
272+
257273
if ($this->isFifoQueue($this->configuration['queue_name'])) {
258274
$parameters['MessageGroupId'] = null !== $messageGroupId ? $messageGroupId : __METHOD__;
259-
$parameters['MessageDeduplicationId'] = null !== $messageDeduplicationId ? $messageDeduplicationId : sha1($messageBody);
275+
$parameters['MessageDeduplicationId'] = null !== $messageDeduplicationId ? $messageDeduplicationId : sha1(json_encode(['body' => $body, 'headers' => $headers]));
260276
}
261277

262278
$this->call($this->getQueueUrl(), $parameters);

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.