Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

[Messenger] dispatch event when a message is retried #36152

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
[Messenger] dispatch event when a message is retried
  • Loading branch information
nikophil authored and fabpot committed Oct 2, 2020
commit 55bddcb72137b65fc1378c69829fa0163ffeb376
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@
abstract_arg('senders service locator'),
service('messenger.retry_strategy_locator'),
service('logger')->ignoreOnInvalid(),
service('event_dispatcher'),
])
->tag('kernel.event_subscriber')
->tag('monolog.logger', ['channel' => 'messenger'])
Expand Down
2 changes: 2 additions & 0 deletions 2 src/Symfony/Component/Messenger/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ CHANGELOG
* Added `FlattenExceptionNormalizer` to give more information about the exception on Messenger background processes. The `FlattenExceptionNormalizer` has a higher priority than `ProblemNormalizer` and it is only used when the Messenger serialization context is set.
* Added factory methods to `DelayStamp`.
* Removed the exception when dispatching a message with a `DispatchAfterCurrentBusStamp` and not in a context of another dispatch call
* Added `WorkerMessageRetriedEvent`
* Added `WorkerMessageReceivedEvent::setEnvelope()` and made event mutable

5.1.0
-----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Event;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\StampInterface;

abstract class AbstractWorkerMessageEvent
{
Expand All @@ -36,4 +37,9 @@ public function getReceiverName(): string
{
return $this->receiverName;
}

public function addStamps(StampInterface ...$stamps): void
{
$this->envelope = $this->envelope->with(...$stamps);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Event;

/**
* Dispatched after a message has been sent for retry.
*
* The event name is the class name.
*/
final class WorkerMessageRetriedEvent extends AbstractWorkerMessageEvent
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerMessageRetriedEvent;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Exception\RecoverableExceptionInterface;
use Symfony\Component\Messenger\Exception\RuntimeException;
Expand All @@ -24,6 +25,7 @@
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\StampInterface;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;

/**
* @author Tobias Schultze <http://tobion.de>
Expand All @@ -33,13 +35,15 @@ class SendFailedMessageForRetryListener implements EventSubscriberInterface
private $sendersLocator;
private $retryStrategyLocator;
private $logger;
private $eventDispatcher;
private $historySize;

public function __construct(ContainerInterface $sendersLocator, ContainerInterface $retryStrategyLocator, LoggerInterface $logger = null, int $historySize = 10)
public function __construct(ContainerInterface $sendersLocator, ContainerInterface $retryStrategyLocator, LoggerInterface $logger = null, EventDispatcherInterface $eventDispatcher = null, int $historySize = 10)
{
$this->sendersLocator = $sendersLocator;
$this->retryStrategyLocator = $retryStrategyLocator;
$this->logger = $logger;
$this->eventDispatcher = $eventDispatcher;
nikophil marked this conversation as resolved.
Show resolved Hide resolved
$this->historySize = $historySize;
}

Expand Down Expand Up @@ -74,6 +78,10 @@ public function onMessageFailed(WorkerMessageFailedEvent $event)

// re-send the message for retry
$this->getSenderForTransport($event->getReceiverName())->send($retryEnvelope);

if (null !== $this->eventDispatcher) {
$this->eventDispatcher->dispatch(new WorkerMessageRetriedEvent($retryEnvelope, $event->getReceiverName()));
}
} else {
if (null !== $this->logger) {
$this->logger->critical('Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"', $context + ['retryCount' => $retryCount, 'error' => $throwable->getMessage(), 'exception' => $throwable]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;

class SendFailedMessageForRetryListenerTest extends TestCase
{
Expand Down Expand Up @@ -107,7 +108,10 @@ public function testEnvelopeIsSentToTransportOnRetry()
$retryStrategyLocator->expects($this->once())->method('has')->willReturn(true);
$retryStrategyLocator->expects($this->once())->method('get')->willReturn($retryStategy);

$listener = new SendFailedMessageForRetryListener($senderLocator, $retryStrategyLocator);
$eventDispatcher = $this->createMock(EventDispatcherInterface::class);
$eventDispatcher->expects($this->once())->method('dispatch');

$listener = new SendFailedMessageForRetryListener($senderLocator, $retryStrategyLocator, null, $eventDispatcher);

$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception);

Expand Down
39 changes: 39 additions & 0 deletions 39 src/Symfony/Component/Messenger/Tests/WorkerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\SentStamp;
use Symfony\Component\Messenger\Stamp\StampInterface;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Worker;
Expand Down Expand Up @@ -243,14 +244,45 @@ public function testWorkerWithMultipleReceivers()
// make sure they were processed in the correct order
$this->assertSame([$envelope1, $envelope2, $envelope3, $envelope4, $envelope5, $envelope6], $processedEnvelopes);
}

public function testWorkerMessageReceivedEventMutability()
{
$envelope = new Envelope(new DummyMessage('Hello'));
$receiver = new DummyReceiver([[$envelope]]);

$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
$bus->method('dispatch')->willReturnArgument(0);

$eventDispatcher = new EventDispatcher();
$eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));

$stamp = new class() implements StampInterface {
};
$listener = function (WorkerMessageReceivedEvent $event) use ($stamp) {
$event->addStamps($stamp);
};

$eventDispatcher->addListener(WorkerMessageReceivedEvent::class, $listener);

$worker = new Worker([$receiver], $bus, $eventDispatcher);
$worker->run();

$envelope = current($receiver->getAcknowledgedEnvelopes());
$this->assertCount(1, $envelope->all(\get_class($stamp)));
}
}

class DummyReceiver implements ReceiverInterface
{
private $deliveriesOfEnvelopes;
private $acknowledgedEnvelopes;
private $rejectedEnvelopes;
private $acknowledgeCount = 0;
private $rejectCount = 0;

/**
* @param Envelope[][] $deliveriesOfEnvelopes
*/
public function __construct(array $deliveriesOfEnvelopes)
{
$this->deliveriesOfEnvelopes = $deliveriesOfEnvelopes;
Expand All @@ -266,11 +298,13 @@ public function get(): iterable
public function ack(Envelope $envelope): void
{
++$this->acknowledgeCount;
$this->acknowledgedEnvelopes[] = $envelope;
}

public function reject(Envelope $envelope): void
{
++$this->rejectCount;
$this->rejectedEnvelopes[] = $envelope;
}

public function getAcknowledgeCount(): int
Expand All @@ -282,4 +316,9 @@ public function getRejectCount(): int
{
return $this->rejectCount;
}

public function getAcknowledgedEnvelopes(): array
{
return $this->acknowledgedEnvelopes;
}
}
9 changes: 7 additions & 2 deletions 9 src/Symfony/Component/Messenger/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver,
{
$event = new WorkerMessageReceivedEvent($envelope, $transportName);
$this->dispatchEvent($event);
$envelope = $event->getEnvelope();
fabpot marked this conversation as resolved.
Show resolved Hide resolved

if (!$event->shouldHandle()) {
return;
Expand All @@ -123,7 +124,9 @@ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver,
$envelope = $throwable->getEnvelope();
}

$this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $transportName, $throwable));
$failedEvent = new WorkerMessageFailedEvent($envelope, $transportName, $throwable);
$this->dispatchEvent($failedEvent);
$envelope = $failedEvent->getEnvelope();

if (!$rejectFirst) {
$receiver->reject($envelope);
Expand All @@ -132,7 +135,9 @@ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver,
return;
}

$this->dispatchEvent(new WorkerMessageHandledEvent($envelope, $transportName));
$handledEvent = new WorkerMessageHandledEvent($envelope, $transportName);
$this->dispatchEvent($handledEvent);
$envelope = $handledEvent->getEnvelope();

if (null !== $this->logger) {
$message = $envelope->getMessage();
Expand Down
Morty Proxy This is a proxified and sanitized view of the page, visit original site.