Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

[Messenger] No need for retry to require SentStamp #32053

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ public function testRetryMechanism()
$senderAndReceiver = new DummySenderAndReceiver();

$senderLocator = $this->createMock(ContainerInterface::class);
$senderLocator->method('has')->with('sender_alias')->willReturn(true);
$senderLocator->method('get')->with('sender_alias')->willReturn($senderAndReceiver);
$senderLocator = new SendersLocator([DummyMessage::class => ['sender_alias']], $senderLocator);
$senderLocator->method('has')->with('transportName')->willReturn(true);
$senderLocator->method('get')->with('transportName')->willReturn($senderAndReceiver);
$senderLocator = new SendersLocator([DummyMessage::class => ['transportName']], $senderLocator);

$handler = new DummyMessageHandlerFailingFirstTimes(0);
$throwingHandler = new DummyMessageHandlerFailingFirstTimes(1);
Expand All @@ -52,7 +52,7 @@ public function testRetryMechanism()
$envelope = new Envelope(new DummyMessage('API'));
$bus->dispatch($envelope);

$worker = new Worker(['receiverName' => $senderAndReceiver], $bus, ['receiverName' => new MultiplierRetryStrategy()]);
$worker = new Worker(['transportName' => $senderAndReceiver], $bus, ['transportName' => new MultiplierRetryStrategy()]);
$worker->run([], function (?Envelope $envelope) use ($worker) {
if (null === $envelope) {
$worker->stop();
Expand Down
12 changes: 6 additions & 6 deletions 12 src/Symfony/Component/Messenger/Tests/WorkerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public function testWorkerDoesNotWrapMessagesAlreadyWrappedWithReceivedMessage()
public function testDispatchCausesRetry()
{
$receiver = new DummyReceiver([
[new Envelope(new DummyMessage('Hello'), [new SentStamp('Some\Sender', 'sender_alias')])],
[new Envelope(new DummyMessage('Hello'), [new SentStamp('Some\Sender', 'transport1')])],
]);

$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
Expand All @@ -97,7 +97,7 @@ public function testDispatchCausesRetry()
$this->assertNotNull($redeliveryStamp);
// retry count now at 1
$this->assertSame(1, $redeliveryStamp->getRetryCount());
$this->assertSame('sender_alias', $redeliveryStamp->getSenderClassOrAlias());
$this->assertSame('transport1', $redeliveryStamp->getSenderClassOrAlias());

// received stamp is removed
$this->assertNull($envelope->last(ReceivedStamp::class));
Expand All @@ -108,7 +108,7 @@ public function testDispatchCausesRetry()
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
$retryStrategy->expects($this->once())->method('isRetryable')->willReturn(true);

$worker = new Worker(['receiver1' => $receiver], $bus, ['receiver1' => $retryStrategy]);
$worker = new Worker(['transport1' => $receiver], $bus, ['transport1' => $retryStrategy]);
$worker->run([], function (?Envelope $envelope) use ($worker) {
// stop after the messages finish
if (null === $envelope) {
Expand All @@ -123,7 +123,7 @@ public function testDispatchCausesRetry()
public function testDispatchCausesRejectWhenNoRetry()
{
$receiver = new DummyReceiver([
[new Envelope(new DummyMessage('Hello'), [new SentStamp('Some\Sender', 'sender_alias')])],
[new Envelope(new DummyMessage('Hello'), [new SentStamp('Some\Sender', 'transport1')])],
]);

$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
Expand All @@ -132,7 +132,7 @@ public function testDispatchCausesRejectWhenNoRetry()
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
$retryStrategy->expects($this->once())->method('isRetryable')->willReturn(false);

$worker = new Worker(['receiver1' => $receiver], $bus, ['receiver1' => $retryStrategy]);
$worker = new Worker(['transport1' => $receiver], $bus, ['transport1' => $retryStrategy]);
$worker->run([], function (?Envelope $envelope) use ($worker) {
// stop after the messages finish
if (null === $envelope) {
Expand All @@ -155,7 +155,7 @@ public function testDispatchCausesRejectOnUnrecoverableMessage()
$retryStrategy = $this->getMockBuilder(RetryStrategyInterface::class)->getMock();
$retryStrategy->expects($this->never())->method('isRetryable');

$worker = new Worker(['receiver1' => $receiver], $bus, ['receiver1' => $retryStrategy]);
$worker = new Worker(['transport1' => $receiver], $bus, ['transport1' => $retryStrategy]);
$worker->run([], function (?Envelope $envelope) use ($worker) {
// stop after the messages finish
if (null === $envelope) {
Expand Down
26 changes: 1 addition & 25 deletions 26 src/Symfony/Component/Messenger/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentStamp;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;

Expand Down Expand Up @@ -150,7 +148,7 @@ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver,

// add the delay and retry stamp info + remove ReceivedStamp
$retryEnvelope = $envelope->with(new DelayStamp($delay))
->with(new RedeliveryStamp($retryCount, $this->getSenderClassOrAlias($envelope)))
->with(new RedeliveryStamp($retryCount, $transportName))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The edge case I believe I was coding for (and maybe we decide it's not legitimate) is if you send on transport/sender A but receive on transport/receiver B. It's an odd case, but in that situation, we would (I think?) want to re-send to sender A, not receiver B. Thoughts?

Copy link
Contributor Author

@Tobion Tobion Jun 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory you could receive a message from transport B but route it to transport A for sending.
But in the end, senders and receivers are bound together by TransportInterface which requires implementing both. So if you do the above, you still have to implement sending part in transport B. So you could just use the same sender in transport B that you use in transport A if you need to cover retry going to transport A.
So to me that is an edge case that people can solve in custom transports. Nothing we should take care of by default. And in case we want to account for this later, the better and more explicit solution to me is to add a config option on the transport to configure the retry_transport (similar to the failure_transport).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is if you send on transport/sender A but receive on transport/receiver B

I also don't see how SentStamp should work for this case anyway. If a message comes from transport B, it cannot have the SentStamp pointing to transport A. If it was sent to transport B, it also has the SentStamp pointing to B (at least the last SentStamp) added by SendMessageMiddleware.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also don't see how SentStamp should work for this case anyway

It would work like this:

A) Send a message on TransportA. The message now has a SentStamp for TransportA.
B) Receive message on TransportB. The message will, of course, still only have the SentStamp for TransportA.

It's an edge case where, for example, you're using AMQP and send to TransportA, which has some routing & binding keys that ultimately put it in some queue "foo". Then, for whatever reason, you setup TransportB to receive from queue "foo". In this case, you will directly "receive" a message from TransportB that only has a SentStamp for TransportA.

But in the end, senders and receivers are bound together by TransportInterface which requires implementing both

I think that's not true... at least on a low, component level. I believe that, when routing, you can set the class to route to any service id that has the SenderInterface, even if it's not defined as a transport (and doesn't define a TransportInterface). I think the same is true for receiving messages: I think you can pass any service id to messenger:consume.

Sorry to complicate things - this is what "possible" situation that was floating around when this was originally coded.

The tl;dr is this: the SentStamp is the concrete way to mark exactly which sender sent a message so that we can definitely use the same sender during redelivery. But yes, we could add a retry_transport option, which could act as an override for SentStamp/be used when that stamp is not available. Or, if the stamp is not available, we could "try" to redeliver to the "receiver" (and throw an exception if it doesn't implement SenderInterface).

Copy link
Contributor Author

@Tobion Tobion Jun 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to not have the SentStamp logic. We agreed it tries to solve an edge case. To me that is something SF Messenger should not code against. It potentially breaks as much as it solves. For example, if I sent a Message to transport A, rename the transport to B and deploy again. Suddenly the retry of all existing messages in the queue breaks because it still tries to resend to A but A does not exist anymore. And with delayed messages of hours or days, this can easily happen.

How do we proceed now?

  1. accept the PR to remove SentStamp logic from retry in 4.3
  2. try to implement both the SentStamp logic and fallback to receiver
  3. as 2) but remove the SentStamp logic only in 4.4 to avoid potential bc break in 4.3

I'm in favor of 1).
2) would make the messenger behavior context dependent which I don't like at all.

->withoutAll(ReceivedStamp::class);

// re-send the message
Expand Down Expand Up @@ -197,28 +195,6 @@ private function shouldRetry(\Throwable $e, Envelope $envelope, RetryStrategyInt
return false;
}

$sentStamp = $envelope->last(SentStamp::class);
if (null === $sentStamp) {
if (null !== $this->logger) {
$this->logger->warning('Message will not be retried because the SentStamp is missing and so the target sender cannot be determined.');
}

return false;
}

return $retryStrategy->isRetryable($envelope);
}

private function getSenderClassOrAlias(Envelope $envelope): string
{
/** @var SentStamp|null $sentStamp */
$sentStamp = $envelope->last(SentStamp::class);

if (null === $sentStamp) {
// should not happen, because of the check in shouldRetry()
throw new LogicException('Could not find SentStamp.');
}

return $sentStamp->getSenderAlias() ?: $sentStamp->getSenderClass();
}
}
Morty Proxy This is a proxified and sanitized view of the page, visit original site.