From 9f4b8a4cf51bb58f5fa79f097cf0190c53b4ea87 Mon Sep 17 00:00:00 2001 From: HypeMC Date: Sun, 12 Mar 2023 21:47:20 +0100 Subject: [PATCH] [Messenger] Add `bury_on_reject` option to Beanstalkd bridge --- .../Messenger/Bridge/Beanstalkd/CHANGELOG.md | 1 + .../Transport/BeanstalkdReceiverTest.php | 34 ++++++- .../Tests/Transport/ConnectionTest.php | 97 +++++++++++++++++-- .../Transport/BeanstalkdReceiver.php | 12 ++- .../Beanstalkd/Transport/Connection.php | 33 ++++++- .../Messenger/Bridge/Beanstalkd/composer.json | 2 +- src/Symfony/Component/Messenger/CHANGELOG.md | 5 + .../SendFailedMessageForRetryListener.php | 3 + .../Messenger/Stamp/SentForRetryStamp.php | 23 +++++ .../SendFailedMessageForRetryListenerTest.php | 58 +++++++++++ 10 files changed, 254 insertions(+), 14 deletions(-) create mode 100644 src/Symfony/Component/Messenger/Stamp/SentForRetryStamp.php diff --git a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/CHANGELOG.md b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/CHANGELOG.md index 4ab15d4a14106..d1ae93d19ad54 100644 --- a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/CHANGELOG.md @@ -5,6 +5,7 @@ CHANGELOG --- * Add `BeanstalkdPriorityStamp` option to allow setting the message priority + * Add `bury_on_reject` option to bury failed messages instead of deleting them 7.2 --- diff --git a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdReceiverTest.php b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdReceiverTest.php index beba416297b9f..eeab9492e13f6 100644 --- a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdReceiverTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdReceiverTest.php @@ -13,11 +13,13 @@ use PHPUnit\Framework\TestCase; use Symfony\Component\Messenger\Bridge\Beanstalkd\Tests\Fixtures\DummyMessage; +use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdPriorityStamp; use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceivedStamp; use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceiver; use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\Connection; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; +use Symfony\Component\Messenger\Stamp\SentForRetryStamp; use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer; use Symfony\Component\Messenger\Transport\Serialization\Serializer; @@ -81,12 +83,42 @@ public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException() $beanstalkdEnvelope = $this->createBeanstalkdEnvelope(); $connection = $this->createMock(Connection::class); $connection->expects($this->once())->method('get')->willReturn($beanstalkdEnvelope); - $connection->expects($this->once())->method('reject'); + $connection->expects($this->once())->method('getMessagePriority')->with($beanstalkdEnvelope['id'])->willReturn(2); + $connection->expects($this->once())->method('reject')->with($beanstalkdEnvelope['id'], 2); $receiver = new BeanstalkdReceiver($connection, $serializer); $receiver->get(); } + /** + * @dataProvider provideRejectCases + */ + public function testReject(array $stamps, ?int $priority, bool $forceDelete) + { + $serializer = $this->createSerializer(); + + $id = 'some id'; + + $connection = $this->createMock(Connection::class); + $connection->expects($this->once())->method('reject')->with($id, $priority, $forceDelete); + + $envelope = (new Envelope(new DummyMessage('Oy')))->with(new BeanstalkdReceivedStamp($id, 'foo bar')); + foreach ($stamps as $stamp) { + $envelope = $envelope->with($stamp); + } + + $receiver = new BeanstalkdReceiver($connection, $serializer); + $receiver->reject($envelope); + } + + public static function provideRejectCases(): iterable + { + yield 'No stamp' => [[], null, false]; + yield 'With sent for retry true' => [[new SentForRetryStamp(true)], null, true]; + yield 'With sent for retry true and priority' => [[new BeanstalkdPriorityStamp(2), new SentForRetryStamp(true)], 2, true]; + yield 'With sent for retry false' => [[new SentForRetryStamp(false)], null, false]; + } + public function testKeepalive() { $serializer = $this->createSerializer(); diff --git a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/ConnectionTest.php b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/ConnectionTest.php index 5673361f785f5..1480e8e56c372 100644 --- a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/ConnectionTest.php +++ b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/ConnectionTest.php @@ -47,6 +47,7 @@ public function testFromDsn() $this->assertSame('default', $configuration['tube_name']); $this->assertSame(0, $configuration['timeout']); $this->assertSame(90, $configuration['ttr']); + $this->assertFalse($configuration['bury_on_reject']); $this->assertEquals( $connection = new Connection([], Pheanstalk::create('foobar', 15555)), @@ -58,22 +59,32 @@ public function testFromDsn() $this->assertSame('default', $configuration['tube_name']); $this->assertSame(0, $configuration['timeout']); $this->assertSame(90, $configuration['ttr']); + $this->assertFalse($configuration['bury_on_reject']); $this->assertSame('default', $connection->getTube()); } public function testFromDsnWithOptions() { $this->assertEquals( - $connection = Connection::fromDsn('beanstalkd://localhost', ['tube_name' => 'foo', 'timeout' => 10, 'ttr' => 5000]), - Connection::fromDsn('beanstalkd://localhost?tube_name=foo&timeout=10&ttr=5000') + $connectionWithOptions = Connection::fromDsn('beanstalkd://localhost', ['tube_name' => 'foo', 'timeout' => 10, 'ttr' => 5000, 'bury_on_reject' => true]), + $connectionWithQuery = Connection::fromDsn('beanstalkd://localhost?tube_name=foo&timeout=10&ttr=5000&bury_on_reject=true') ); - $configuration = $connection->getConfiguration(); + $configuration = $connectionWithOptions->getConfiguration(); $this->assertSame('foo', $configuration['tube_name']); $this->assertSame(10, $configuration['timeout']); $this->assertSame(5000, $configuration['ttr']); - $this->assertSame('foo', $connection->getTube()); + $this->assertTrue($configuration['bury_on_reject']); + $this->assertSame('foo', $connectionWithOptions->getTube()); + + $configuration = $connectionWithQuery->getConfiguration(); + + $this->assertSame('foo', $configuration['tube_name']); + $this->assertSame(10, $configuration['timeout']); + $this->assertSame(5000, $configuration['ttr']); + $this->assertTrue($configuration['bury_on_reject']); + $this->assertSame('foo', $connectionWithOptions->getTube()); } public function testFromDsnOptionsArrayWinsOverOptionsFromDsn() @@ -82,11 +93,12 @@ public function testFromDsnOptionsArrayWinsOverOptionsFromDsn() 'tube_name' => 'bar', 'timeout' => 20, 'ttr' => 6000, + 'bury_on_reject' => false, ]; $this->assertEquals( $connection = new Connection($options, Pheanstalk::create('localhost', 11333)), - Connection::fromDsn('beanstalkd://localhost:11333?tube_name=foo&timeout=10&ttr=5000', $options) + Connection::fromDsn('beanstalkd://localhost:11333?tube_name=foo&timeout=10&ttr=5000&bury_on_reject=true', $options) ); $configuration = $connection->getConfiguration(); @@ -94,6 +106,7 @@ public function testFromDsnOptionsArrayWinsOverOptionsFromDsn() $this->assertSame($options['tube_name'], $configuration['tube_name']); $this->assertSame($options['timeout'], $configuration['timeout']); $this->assertSame($options['ttr'], $configuration['ttr']); + $this->assertSame($options['bury_on_reject'], $configuration['bury_on_reject']); $this->assertSame($options['tube_name'], $connection->getTube()); } @@ -199,7 +212,12 @@ public function testAckWhenABeanstalkdExceptionOccurs() $connection->ack((string) $id); } - public function testReject() + /** + * @testWith [false, false] + * [false, true] + * [true, true] + */ + public function testReject(bool $buryOnReject, bool $forceDelete) { $id = 123456; @@ -209,11 +227,42 @@ public function testReject() $client->expects($this->once())->method('useTube')->with($tube)->willReturn($client); $client->expects($this->once())->method('delete')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id)); - $connection = new Connection(['tube_name' => $tube], $client); + $connection = new Connection(['tube_name' => $tube, 'bury_on_reject' => $buryOnReject], $client); + + $connection->reject((string) $id, null, $forceDelete); + } + + public function testRejectWithBury() + { + $id = 123456; + + $tube = 'baz'; + + $client = $this->createMock(PheanstalkInterface::class); + $client->expects($this->once())->method('useTube')->with($tube)->willReturn($client); + $client->expects($this->once())->method('bury')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id), 1024); + + $connection = new Connection(['tube_name' => $tube, 'bury_on_reject' => true], $client); $connection->reject((string) $id); } + public function testRejectWithBuryAndPriority() + { + $id = 123456; + $priority = 2; + + $tube = 'baz'; + + $client = $this->createMock(PheanstalkInterface::class); + $client->expects($this->once())->method('useTube')->with($tube)->willReturn($client); + $client->expects($this->once())->method('bury')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id), $priority); + + $connection = new Connection(['tube_name' => $tube, 'bury_on_reject' => true], $client); + + $connection->reject((string) $id, $priority); + } + public function testRejectWhenABeanstalkdExceptionOccurs() { $id = 123456; @@ -263,6 +312,40 @@ public function testMessageCountWhenABeanstalkdExceptionOccurs() $connection->getMessageCount(); } + public function testMessagePriority() + { + $id = 123456; + $priority = 51; + + $tube = 'baz'; + + $response = new ArrayResponse('OK', ['pri' => $priority]); + + $client = $this->createMock(PheanstalkInterface::class); + $client->expects($this->once())->method('statsJob')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id))->willReturn($response); + + $connection = new Connection(['tube_name' => $tube], $client); + + $this->assertSame($priority, $connection->getMessagePriority((string) $id)); + } + + public function testMessagePriorityWhenABeanstalkdExceptionOccurs() + { + $id = 123456; + + $tube = 'baz1234'; + + $exception = new ClientException('foobar error'); + + $client = $this->createMock(PheanstalkInterface::class); + $client->expects($this->once())->method('statsJob')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id))->willThrowException($exception); + + $connection = new Connection(['tube_name' => $tube], $client); + + $this->expectExceptionObject(new TransportException($exception->getMessage(), 0, $exception)); + $connection->getMessagePriority((string) $id); + } + public function testSend() { $tube = 'xyz'; diff --git a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdReceiver.php b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdReceiver.php index bd716a7d5efe7..03d791b4b2784 100644 --- a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdReceiver.php +++ b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdReceiver.php @@ -14,6 +14,7 @@ use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Exception\LogicException; use Symfony\Component\Messenger\Exception\MessageDecodingFailedException; +use Symfony\Component\Messenger\Stamp\SentForRetryStamp; use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface; @@ -48,7 +49,10 @@ public function get(): iterable 'headers' => $beanstalkdEnvelope['headers'], ]); } catch (MessageDecodingFailedException $exception) { - $this->connection->reject($beanstalkdEnvelope['id']); + $this->connection->reject( + $beanstalkdEnvelope['id'], + $this->connection->getMessagePriority($beanstalkdEnvelope['id']), + ); throw $exception; } @@ -68,7 +72,11 @@ public function ack(Envelope $envelope): void public function reject(Envelope $envelope): void { - $this->connection->reject($this->findBeanstalkdReceivedStamp($envelope)->getId()); + $this->connection->reject( + $this->findBeanstalkdReceivedStamp($envelope)->getId(), + $envelope->last(BeanstalkdPriorityStamp::class)?->priority, + $envelope->last(SentForRetryStamp::class)?->isSent ?? false, + ); } public function keepalive(Envelope $envelope, ?int $seconds = null): void diff --git a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php index 528bdc9618412..8b2b5f67ba821 100644 --- a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php +++ b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php @@ -32,11 +32,13 @@ class Connection 'tube_name' => PheanstalkInterface::DEFAULT_TUBE, 'timeout' => 0, 'ttr' => 90, + 'bury_on_reject' => false, ]; private string $tube; private int $timeout; private int $ttr; + private bool $buryOnReject; /** * Constructor. @@ -46,6 +48,7 @@ class Connection * * tube_name: name of the tube * * timeout: message reservation timeout (in seconds) * * ttr: the message time to run before it is put back in the ready queue (in seconds) + * * bury_on_reject: bury rejected messages instead of deleting them */ public function __construct( private array $configuration, @@ -55,6 +58,7 @@ public function __construct( $this->tube = $this->configuration['tube_name']; $this->timeout = $this->configuration['timeout']; $this->ttr = $this->configuration['ttr']; + $this->buryOnReject = $this->configuration['bury_on_reject']; } public static function fromDsn(#[\SensitiveParameter] string $dsn, array $options = []): self @@ -74,7 +78,15 @@ public static function fromDsn(#[\SensitiveParameter] string $dsn, array $option } $configuration = []; - $configuration += $options + $query + self::DEFAULT_OPTIONS; + foreach (self::DEFAULT_OPTIONS as $k => $v) { + $value = $options[$k] ?? $query[$k] ?? $v; + + $configuration[$k] = match (\gettype($v)) { + 'integer' => filter_var($value, \FILTER_VALIDATE_INT), + 'boolean' => filter_var($value, \FILTER_VALIDATE_BOOL), + default => $value, + }; + } // check for extra keys in options $optionsExtraKeys = array_diff(array_keys($options), array_keys(self::DEFAULT_OPTIONS)); @@ -172,10 +184,14 @@ public function ack(string $id): void } } - public function reject(string $id): void + public function reject(string $id, ?int $priority = null, bool $forceDelete = false): void { try { - $this->client->useTube($this->tube)->delete(new JobId((int) $id)); + if (!$forceDelete && $this->buryOnReject) { + $this->client->useTube($this->tube)->bury(new JobId((int) $id), $priority ?? PheanstalkInterface::DEFAULT_PRIORITY); + } else { + $this->client->useTube($this->tube)->delete(new JobId((int) $id)); + } } catch (Exception $exception) { throw new TransportException($exception->getMessage(), 0, $exception); } @@ -201,4 +217,15 @@ public function getMessageCount(): int return (int) $tubeStats['current-jobs-ready']; } + + public function getMessagePriority(string $id): int + { + try { + $jobStats = $this->client->statsJob(new JobId((int) $id)); + } catch (Exception $exception) { + throw new TransportException($exception->getMessage(), 0, $exception); + } + + return (int) $jobStats['pri']; + } } diff --git a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/composer.json b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/composer.json index 549271b4b431a..2c25279b4177d 100644 --- a/src/Symfony/Component/Messenger/Bridge/Beanstalkd/composer.json +++ b/src/Symfony/Component/Messenger/Bridge/Beanstalkd/composer.json @@ -14,7 +14,7 @@ "require": { "php": ">=8.2", "pda/pheanstalk": "^4.0", - "symfony/messenger": "^7.2" + "symfony/messenger": "^7.3" }, "require-dev": { "symfony/property-access": "^6.4|^7.0", diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index 4d492dfd49524..b703a9dc16aca 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -1,6 +1,11 @@ CHANGELOG ========= +7.3 +--- + + * Add `SentForRetryStamp` that identifies whether a failed message was sent for retry + 7.2 --- diff --git a/src/Symfony/Component/Messenger/EventListener/SendFailedMessageForRetryListener.php b/src/Symfony/Component/Messenger/EventListener/SendFailedMessageForRetryListener.php index f6334173972af..745902539b5b7 100644 --- a/src/Symfony/Component/Messenger/EventListener/SendFailedMessageForRetryListener.php +++ b/src/Symfony/Component/Messenger/EventListener/SendFailedMessageForRetryListener.php @@ -25,6 +25,7 @@ use Symfony\Component\Messenger\Retry\RetryStrategyInterface; use Symfony\Component\Messenger\Stamp\DelayStamp; use Symfony\Component\Messenger\Stamp\RedeliveryStamp; +use Symfony\Component\Messenger\Stamp\SentForRetryStamp; use Symfony\Component\Messenger\Stamp\StampInterface; use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; use Symfony\Component\Messenger\Transport\Sender\SenderInterface; @@ -82,6 +83,8 @@ public function onMessageFailed(WorkerMessageFailedEvent $event): void } else { $this->logger?->critical('Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"', $context + ['retryCount' => $retryCount, 'error' => $throwable->getMessage(), 'exception' => $throwable]); } + + $event->addStamps(new SentForRetryStamp($shouldRetry)); } /** diff --git a/src/Symfony/Component/Messenger/Stamp/SentForRetryStamp.php b/src/Symfony/Component/Messenger/Stamp/SentForRetryStamp.php new file mode 100644 index 0000000000000..dc99167f683b9 --- /dev/null +++ b/src/Symfony/Component/Messenger/Stamp/SentForRetryStamp.php @@ -0,0 +1,23 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Stamp; + +/** + * Stamp indicating whether a failed message has been sent for retry. + */ +final class SentForRetryStamp implements NonSendableStampInterface +{ + public function __construct( + public readonly bool $isSent, + ) { + } +} diff --git a/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageForRetryListenerTest.php b/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageForRetryListenerTest.php index cf3c86d7f4ffb..966458f01e075 100644 --- a/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageForRetryListenerTest.php +++ b/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageForRetryListenerTest.php @@ -22,6 +22,7 @@ use Symfony\Component\Messenger\Retry\RetryStrategyInterface; use Symfony\Component\Messenger\Stamp\DelayStamp; use Symfony\Component\Messenger\Stamp\RedeliveryStamp; +use Symfony\Component\Messenger\Stamp\SentForRetryStamp; use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; use Symfony\Component\Messenger\Transport\Sender\SenderInterface; @@ -40,6 +41,33 @@ public function testNoRetryStrategyCausesNoRetry() $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception); $listener->onMessageFailed($event); + + /** @var SentForRetryStamp|null $sentForRetryStamp */ + $sentForRetryStamp = $event->getEnvelope()->last(SentForRetryStamp::class); + + $this->assertInstanceOf(SentForRetryStamp::class, $sentForRetryStamp); + $this->assertFalse($sentForRetryStamp->isSent); + } + + public function testIsRetryableFalseCausesNoRetry() + { + $sender = $this->createMock(SenderInterface::class); + $sender->expects($this->never())->method('send'); + $sendersLocator = new Container(); + $sendersLocator->set('my_receiver', $sender); + $listener = new SendFailedMessageForRetryListener($sendersLocator, new Container()); + + $exception = new \Exception('no!'); + $envelope = new Envelope(new \stdClass()); + $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception); + + $listener->onMessageFailed($event); + + /** @var SentForRetryStamp|null $sentForRetryStamp */ + $sentForRetryStamp = $event->getEnvelope()->last(SentForRetryStamp::class); + + $this->assertInstanceOf(SentForRetryStamp::class, $sentForRetryStamp); + $this->assertFalse($sentForRetryStamp->isSent); } public function testRecoverableStrategyCausesRetry() @@ -74,6 +102,12 @@ public function testRecoverableStrategyCausesRetry() $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception); $listener->onMessageFailed($event); + + /** @var SentForRetryStamp|null $sentForRetryStamp */ + $sentForRetryStamp = $event->getEnvelope()->last(SentForRetryStamp::class); + + $this->assertInstanceOf(SentForRetryStamp::class, $sentForRetryStamp); + $this->assertTrue($sentForRetryStamp->isSent); } public function testRecoverableExceptionRetryDelayOverridesStrategy() @@ -144,6 +178,12 @@ public function testEnvelopeIsSentToTransportOnRetry() $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception); $listener->onMessageFailed($event); + + /** @var SentForRetryStamp|null $sentForRetryStamp */ + $sentForRetryStamp = $event->getEnvelope()->last(SentForRetryStamp::class); + + $this->assertInstanceOf(SentForRetryStamp::class, $sentForRetryStamp); + $this->assertTrue($sentForRetryStamp->isSent); } public function testEnvelopeIsSentToTransportOnRetryWithExceptionPassedToRetryStrategy() @@ -179,6 +219,12 @@ public function testEnvelopeIsSentToTransportOnRetryWithExceptionPassedToRetrySt $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception); $listener->onMessageFailed($event); + + /** @var SentForRetryStamp|null $sentForRetryStamp */ + $sentForRetryStamp = $event->getEnvelope()->last(SentForRetryStamp::class); + + $this->assertInstanceOf(SentForRetryStamp::class, $sentForRetryStamp); + $this->assertTrue($sentForRetryStamp->isSent); } public function testEnvelopeKeepOnlyTheLast10Stamps() @@ -213,6 +259,12 @@ public function testEnvelopeKeepOnlyTheLast10Stamps() $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception); $listener->onMessageFailed($event); + + /** @var SentForRetryStamp|null $sentForRetryStamp */ + $sentForRetryStamp = $event->getEnvelope()->last(SentForRetryStamp::class); + + $this->assertInstanceOf(SentForRetryStamp::class, $sentForRetryStamp); + $this->assertTrue($sentForRetryStamp->isSent); } public function testRetriedEnvelopePassesToRetriedEvent() @@ -256,5 +308,11 @@ function (WorkerMessageRetriedEvent $retriedEvent) { $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception); $listener->onMessageFailed($event); + + /** @var SentForRetryStamp|null $sentForRetryStamp */ + $sentForRetryStamp = $event->getEnvelope()->last(SentForRetryStamp::class); + + $this->assertInstanceOf(SentForRetryStamp::class, $sentForRetryStamp); + $this->assertTrue($sentForRetryStamp->isSent); } }