Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 01a9fef

Browse filesBrowse files
committed
Adding ConsumedByWorkerStamp as way to mark a message in a "worker context"
1 parent 38f19a9 commit 01a9fef
Copy full SHA for 01a9fef

11 files changed

+41
-19
lines changed

‎src/Symfony/Bridge/Doctrine/Messenger/DoctrineClearEntityManagerMiddleware.php

Copy file name to clipboardExpand all lines: src/Symfony/Bridge/Doctrine/Messenger/DoctrineClearEntityManagerMiddleware.php
+2-2Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
use Doctrine\ORM\EntityManagerInterface;
1515
use Symfony\Component\Messenger\Envelope;
1616
use Symfony\Component\Messenger\Middleware\StackInterface;
17-
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
17+
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
1818

1919
/**
2020
* Clears entity manager after calling all handlers.
@@ -28,7 +28,7 @@ protected function handleForManager(EntityManagerInterface $entityManager, Envel
2828
try {
2929
return $stack->next()->handle($envelope, $stack);
3030
} finally {
31-
if (null !== $envelope->last(ReceivedStamp::class)) {
31+
if (null !== $envelope->last(ConsumedByWorkerStamp::class)) {
3232
$entityManager->clear();
3333
}
3434
}

‎src/Symfony/Bridge/Doctrine/Messenger/DoctrineCloseConnectionMiddleware.php

Copy file name to clipboardExpand all lines: src/Symfony/Bridge/Doctrine/Messenger/DoctrineCloseConnectionMiddleware.php
+2-2Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
use Doctrine\ORM\EntityManagerInterface;
1515
use Symfony\Component\Messenger\Envelope;
1616
use Symfony\Component\Messenger\Middleware\StackInterface;
17-
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
17+
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
1818

1919
/**
2020
* Closes connection and therefore saves number of connections.
@@ -30,7 +30,7 @@ protected function handleForManager(EntityManagerInterface $entityManager, Envel
3030

3131
return $stack->next()->handle($envelope, $stack);
3232
} finally {
33-
if (null !== $envelope->last(ReceivedStamp::class)) {
33+
if (null !== $envelope->last(ConsumedByWorkerStamp::class)) {
3434
$connection->close();
3535
}
3636
}

‎src/Symfony/Bridge/Doctrine/Messenger/DoctrinePingConnectionMiddleware.php

Copy file name to clipboardExpand all lines: src/Symfony/Bridge/Doctrine/Messenger/DoctrinePingConnectionMiddleware.php
+2-2Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
use Doctrine\ORM\EntityManagerInterface;
1515
use Symfony\Component\Messenger\Envelope;
1616
use Symfony\Component\Messenger\Middleware\StackInterface;
17-
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
17+
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
1818

1919
/**
2020
* Checks whether the connection is still open or reconnects otherwise.
@@ -25,7 +25,7 @@ class DoctrinePingConnectionMiddleware extends AbstractDoctrineMiddleware
2525
{
2626
protected function handleForManager(EntityManagerInterface $entityManager, Envelope $envelope, StackInterface $stack): Envelope
2727
{
28-
if (null !== $envelope->last(ReceivedStamp::class)) {
28+
if (null !== $envelope->last(ConsumedByWorkerStamp::class)) {
2929
$this->pingConnection($entityManager);
3030
}
3131

‎src/Symfony/Bridge/Doctrine/Tests/Messenger/DoctrineClearEntityManagerMiddlewareTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Bridge/Doctrine/Tests/Messenger/DoctrineClearEntityManagerMiddlewareTest.php
+2-2Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
use Symfony\Bridge\Doctrine\Messenger\DoctrineClearEntityManagerMiddleware;
1717
use Symfony\Component\Messenger\Envelope;
1818
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
19-
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
19+
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
2020
use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase;
2121

2222
class DoctrineClearEntityManagerMiddlewareTest extends MiddlewareTestCase
@@ -36,7 +36,7 @@ public function testMiddlewareClearEntityManager()
3636
$middleware = new DoctrineClearEntityManagerMiddleware($managerRegistry, 'default');
3737

3838
$envelope = new Envelope(new \stdClass(), [
39-
new ReceivedStamp('async'),
39+
new ConsumedByWorkerStamp(),
4040
]);
4141
$middleware->handle($envelope, $this->getStackMock());
4242
}

‎src/Symfony/Bridge/Doctrine/Tests/Messenger/DoctrineCloseConnectionMiddlewareTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Bridge/Doctrine/Tests/Messenger/DoctrineCloseConnectionMiddlewareTest.php
+2-2Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
use Symfony\Bridge\Doctrine\Messenger\DoctrineCloseConnectionMiddleware;
1818
use Symfony\Component\Messenger\Envelope;
1919
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
20-
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
20+
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
2121
use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase;
2222

2323
class DoctrineCloseConnectionMiddlewareTest extends MiddlewareTestCase
@@ -51,7 +51,7 @@ public function testMiddlewareCloseConnection()
5151
;
5252

5353
$envelope = new Envelope(new \stdClass(), [
54-
new ReceivedStamp('async'),
54+
new ConsumedByWorkerStamp(),
5555
]);
5656
$this->middleware->handle($envelope, $this->getStackMock());
5757
}

‎src/Symfony/Bridge/Doctrine/Tests/Messenger/DoctrinePingConnectionMiddlewareTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Bridge/Doctrine/Tests/Messenger/DoctrinePingConnectionMiddlewareTest.php
+3-3Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
use Symfony\Bridge\Doctrine\Messenger\DoctrinePingConnectionMiddleware;
1818
use Symfony\Component\Messenger\Envelope;
1919
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
20-
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
20+
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
2121
use Symfony\Component\Messenger\Test\Middleware\MiddlewareTestCase;
2222

2323
class DoctrinePingConnectionMiddlewareTest extends MiddlewareTestCase
@@ -58,7 +58,7 @@ public function testMiddlewarePingOk()
5858
;
5959

6060
$envelope = new Envelope(new \stdClass(), [
61-
new ReceivedStamp('async'),
61+
new ConsumedByWorkerStamp(),
6262
]);
6363
$this->middleware->handle($envelope, $this->getStackMock());
6464
}
@@ -75,7 +75,7 @@ public function testMiddlewarePingResetEntityManager()
7575
;
7676

7777
$envelope = new Envelope(new \stdClass(), [
78-
new ReceivedStamp('async'),
78+
new ConsumedByWorkerStamp(),
7979
]);
8080
$this->middleware->handle($envelope, $this->getStackMock());
8181
}

‎src/Symfony/Bridge/Doctrine/composer.json

Copy file name to clipboardExpand all lines: src/Symfony/Bridge/Doctrine/composer.json
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
"symfony/dependency-injection": "^3.4|^4.0|^5.0",
3030
"symfony/form": "^4.4|^5.0",
3131
"symfony/http-kernel": "^3.4|^4.0|^5.0",
32-
"symfony/messenger": "^4.3|^5.0",
32+
"symfony/messenger": "^4.4|^5.0",
3333
"symfony/property-access": "^3.4|^4.0|^5.0",
3434
"symfony/property-info": "^3.4|^4.0|^5.0",
3535
"symfony/proxy-manager-bridge": "^3.4|^4.0|^5.0",
+19Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Stamp;
13+
14+
/**
15+
* A marker that this message was consumed by a worker process.
16+
*/
17+
class ConsumedByWorkerStamp implements NonSendableStampInterface
18+
{
19+
}

‎src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/AmqpExtIntegrationTest.php
+2-1Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,8 @@ public function testItReceivesSignals()
174174
Get envelope with message: Symfony\Component\Messenger\Tests\Fixtures\DummyMessage
175175
with stamps: [
176176
"Symfony\\Component\\Messenger\\Transport\\AmqpExt\\AmqpReceivedStamp",
177-
"Symfony\\Component\\Messenger\\Stamp\\ReceivedStamp"
177+
"Symfony\\Component\\Messenger\\Stamp\\ReceivedStamp",
178+
"Symfony\\Component\\Messenger\\Stamp\\ConsumedByWorkerStamp"
178179
]
179180
Done.
180181

‎src/Symfony/Component/Messenger/Tests/WorkerTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/WorkerTest.php
+4-3Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
2222
use Symfony\Component\Messenger\MessageBusInterface;
2323
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
24+
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
2425
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
2526
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
2627
use Symfony\Component\Messenger\Stamp\SentStamp;
@@ -47,11 +48,11 @@ public function testWorkerDispatchTheReceivedMessage()
4748
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
4849

4950
$bus->expects($this->at(0))->method('dispatch')->with(
50-
$envelope = new Envelope($apiMessage, [new ReceivedStamp('transport')])
51+
$envelope = new Envelope($apiMessage, [new ReceivedStamp('transport'), new ConsumedByWorkerStamp()])
5152
)->willReturn($envelope);
5253

5354
$bus->expects($this->at(1))->method('dispatch')->with(
54-
$envelope = new Envelope($ipaMessage, [new ReceivedStamp('transport')])
55+
$envelope = new Envelope($ipaMessage, [new ReceivedStamp('transport'), new ConsumedByWorkerStamp()])
5556
)->willReturn($envelope);
5657

5758
$worker = new Worker(['transport' => $receiver], $bus);
@@ -69,7 +70,7 @@ public function testWorkerDoesNotWrapMessagesAlreadyWrappedWithReceivedMessage()
6970
{
7071
$envelope = new Envelope(new DummyMessage('API'));
7172
$receiver = new DummyReceiver([[$envelope]]);
72-
$envelope = $envelope->with(new ReceivedStamp('transport'));
73+
$envelope = $envelope->with(new ReceivedStamp('transport'), new ConsumedByWorkerStamp());
7374

7475
$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();
7576
$bus->expects($this->at(0))->method('dispatch')->with($envelope)->willReturn($envelope);

‎src/Symfony/Component/Messenger/Worker.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Worker.php
+2-1Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
2222
use Symfony\Component\Messenger\Exception\UnrecoverableExceptionInterface;
2323
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
24+
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
2425
use Symfony\Component\Messenger\Stamp\DelayStamp;
2526
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
2627
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
@@ -132,7 +133,7 @@ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver,
132133
];
133134

134135
try {
135-
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName)));
136+
$envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName), new ConsumedByWorkerStamp()));
136137
} catch (\Throwable $throwable) {
137138
$rejectFirst = $throwable instanceof RejectRedeliveredMessageException;
138139
if ($rejectFirst) {

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.