Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 5fa7ff9

Browse filesBrowse files
committed
[Messenger] Added X-Ray trace header support to the SQS transport
1 parent 3ca3de5 commit 5fa7ff9
Copy full SHA for 5fa7ff9

File tree

5 files changed

+69
-2
lines changed
Filter options

5 files changed

+69
-2
lines changed

‎src/Symfony/Component/Messenger/Bridge/AmazonSqs/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/AmazonSqs/CHANGELOG.md
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ CHANGELOG
66

77
* Added new `debug` option to log HTTP requests and responses.
88
* Allowed for receiver & sender injection into AmazonSqsTransport
9+
* Add X-Ray trace header support to the SQS transport
910

1011
5.2.0
1112
-----

‎src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsSenderTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/AmazonSqs/Tests/Transport/AmazonSqsSenderTest.php
+19Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use Symfony\Component\Messenger\Bridge\AmazonSqs\Tests\Fixtures\DummyMessage;
1616
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsFifoStamp;
1717
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsSender;
18+
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsXrayTraceHeaderStamp;
1819
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\Connection;
1920
use Symfony\Component\Messenger\Envelope;
2021
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -53,4 +54,22 @@ public function testSendWithAmazonSqsFifoStamp()
5354
$sender = new AmazonSqsSender($connection, $serializer);
5455
$sender->send($envelope);
5556
}
57+
58+
public function testSendWithAmazonSqsXrayTraceHeaderStamp()
59+
{
60+
$envelope = (new Envelope(new DummyMessage('Oy')))
61+
->with($stamp = new AmazonSqsXrayTraceHeaderStamp('traceHeader'));
62+
63+
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
64+
65+
$connection = $this->createMock(Connection::class);
66+
$connection->expects($this->once())->method('send')
67+
->with($encoded['body'], $encoded['headers'], 0, null, null, $stamp->getTraceId());
68+
69+
$serializer = $this->createMock(SerializerInterface::class);
70+
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
71+
72+
$sender = new AmazonSqsSender($connection, $serializer);
73+
$sender->send($envelope);
74+
}
5675
}

‎src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsSender.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/AmazonSqsSender.php
+9-1Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public function send(Envelope $envelope): Envelope
4545

4646
$messageGroupId = null;
4747
$messageDeduplicationId = null;
48+
$xrayTraceId = null;
4849

4950
/** @var AmazonSqsFifoStamp|null $amazonSqsFifoStamp */
5051
$amazonSqsFifoStamp = $envelope->last(AmazonSqsFifoStamp::class);
@@ -53,13 +54,20 @@ public function send(Envelope $envelope): Envelope
5354
$messageDeduplicationId = $amazonSqsFifoStamp->getMessageDeduplicationId();
5455
}
5556

57+
/** @var AmazonSqsXrayTraceHeaderStamp|null $amazonSqsXrayTraceHeaderStamp */
58+
$amazonSqsXrayTraceHeaderStamp = $envelope->last(AmazonSqsXrayTraceHeaderStamp::class);
59+
if (null !== $amazonSqsXrayTraceHeaderStamp) {
60+
$xrayTraceId = $amazonSqsXrayTraceHeaderStamp->getTraceId();
61+
}
62+
5663
try {
5764
$this->connection->send(
5865
$encodedMessage['body'],
5966
$encodedMessage['headers'] ?? [],
6067
$delay,
6168
$messageGroupId,
62-
$messageDeduplicationId
69+
$messageDeduplicationId,
70+
$xrayTraceId
6371
);
6472
} catch (HttpException $e) {
6573
throw new TransportException($e->getMessage(), 0, $e);
+29Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;
13+
14+
use Symfony\Component\Messenger\Stamp\NonSendableStampInterface;
15+
16+
final class AmazonSqsXrayTraceHeaderStamp implements NonSendableStampInterface
17+
{
18+
private $traceId;
19+
20+
public function __construct(string $traceId)
21+
{
22+
$this->traceId = $traceId;
23+
}
24+
25+
public function getTraceId(): string
26+
{
27+
return $this->traceId;
28+
}
29+
}

‎src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/AmazonSqs/Transport/Connection.php
+11-1Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@
1111

1212
namespace Symfony\Component\Messenger\Bridge\AmazonSqs\Transport;
1313

14+
use AsyncAws\Sqs\Enum\MessageSystemAttributeName;
1415
use AsyncAws\Sqs\Enum\QueueAttributeName;
1516
use AsyncAws\Sqs\Result\ReceiveMessageResult;
1617
use AsyncAws\Sqs\SqsClient;
1718
use AsyncAws\Sqs\ValueObject\MessageAttributeValue;
19+
use AsyncAws\Sqs\ValueObject\MessageSystemAttributeValue;
1820
use Psr\Log\LoggerInterface;
1921
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
2022
use Symfony\Component\Messenger\Exception\TransportException;
@@ -312,7 +314,7 @@ public function getMessageCount(): int
312314
return (int) ($attributes[QueueAttributeName::APPROXIMATE_NUMBER_OF_MESSAGES] ?? 0);
313315
}
314316

315-
public function send(string $body, array $headers, int $delay = 0, ?string $messageGroupId = null, ?string $messageDeduplicationId = null): void
317+
public function send(string $body, array $headers, int $delay = 0, ?string $messageGroupId = null, ?string $messageDeduplicationId = null, ?string $xrayTraceId = null): void
316318
{
317319
if ($this->configuration['auto_setup']) {
318320
$this->setup();
@@ -323,6 +325,7 @@ public function send(string $body, array $headers, int $delay = 0, ?string $mess
323325
'MessageBody' => $body,
324326
'DelaySeconds' => $delay,
325327
'MessageAttributes' => [],
328+
'MessageSystemAttributes' => [],
326329
];
327330

328331
$specialHeaders = [];
@@ -346,6 +349,13 @@ public function send(string $body, array $headers, int $delay = 0, ?string $mess
346349
]);
347350
}
348351

352+
if (null !== $xrayTraceId) {
353+
$parameters['MessageSystemAttributes'][MessageSystemAttributeName::AWSTRACE_HEADER] = new MessageSystemAttributeValue([
354+
'DataType' => 'String',
355+
'StringValue' => $xrayTraceId,
356+
]);
357+
}
358+
349359
if (self::isFifoQueue($this->configuration['queue_name'])) {
350360
$parameters['MessageGroupId'] = null !== $messageGroupId ? $messageGroupId : __METHOD__;
351361
$parameters['MessageDeduplicationId'] = null !== $messageDeduplicationId ? $messageDeduplicationId : sha1(json_encode(['body' => $body, 'headers' => $headers]));

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.