Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 9f4b8a4

Browse filesBrowse files
HypeMCfabpot
authored andcommitted
[Messenger] Add bury_on_reject option to Beanstalkd bridge
1 parent d4566b2 commit 9f4b8a4
Copy full SHA for 9f4b8a4

File tree

10 files changed

+254
-14
lines changed
Filter options

10 files changed

+254
-14
lines changed

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Beanstalkd/CHANGELOG.md
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ CHANGELOG
55
---
66

77
* Add `BeanstalkdPriorityStamp` option to allow setting the message priority
8+
* Add `bury_on_reject` option to bury failed messages instead of deleting them
89

910
7.2
1011
---

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdReceiverTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdReceiverTest.php
+33-1Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,13 @@
1313

1414
use PHPUnit\Framework\TestCase;
1515
use Symfony\Component\Messenger\Bridge\Beanstalkd\Tests\Fixtures\DummyMessage;
16+
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdPriorityStamp;
1617
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceivedStamp;
1718
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceiver;
1819
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\Connection;
1920
use Symfony\Component\Messenger\Envelope;
2021
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
22+
use Symfony\Component\Messenger\Stamp\SentForRetryStamp;
2123
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
2224
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
2325
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
@@ -81,12 +83,42 @@ public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException()
8183
$beanstalkdEnvelope = $this->createBeanstalkdEnvelope();
8284
$connection = $this->createMock(Connection::class);
8385
$connection->expects($this->once())->method('get')->willReturn($beanstalkdEnvelope);
84-
$connection->expects($this->once())->method('reject');
86+
$connection->expects($this->once())->method('getMessagePriority')->with($beanstalkdEnvelope['id'])->willReturn(2);
87+
$connection->expects($this->once())->method('reject')->with($beanstalkdEnvelope['id'], 2);
8588

8689
$receiver = new BeanstalkdReceiver($connection, $serializer);
8790
$receiver->get();
8891
}
8992

93+
/**
94+
* @dataProvider provideRejectCases
95+
*/
96+
public function testReject(array $stamps, ?int $priority, bool $forceDelete)
97+
{
98+
$serializer = $this->createSerializer();
99+
100+
$id = 'some id';
101+
102+
$connection = $this->createMock(Connection::class);
103+
$connection->expects($this->once())->method('reject')->with($id, $priority, $forceDelete);
104+
105+
$envelope = (new Envelope(new DummyMessage('Oy')))->with(new BeanstalkdReceivedStamp($id, 'foo bar'));
106+
foreach ($stamps as $stamp) {
107+
$envelope = $envelope->with($stamp);
108+
}
109+
110+
$receiver = new BeanstalkdReceiver($connection, $serializer);
111+
$receiver->reject($envelope);
112+
}
113+
114+
public static function provideRejectCases(): iterable
115+
{
116+
yield 'No stamp' => [[], null, false];
117+
yield 'With sent for retry true' => [[new SentForRetryStamp(true)], null, true];
118+
yield 'With sent for retry true and priority' => [[new BeanstalkdPriorityStamp(2), new SentForRetryStamp(true)], 2, true];
119+
yield 'With sent for retry false' => [[new SentForRetryStamp(false)], null, false];
120+
}
121+
90122
public function testKeepalive()
91123
{
92124
$serializer = $this->createSerializer();

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/ConnectionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/ConnectionTest.php
+90-7Lines changed: 90 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public function testFromDsn()
4747
$this->assertSame('default', $configuration['tube_name']);
4848
$this->assertSame(0, $configuration['timeout']);
4949
$this->assertSame(90, $configuration['ttr']);
50+
$this->assertFalse($configuration['bury_on_reject']);
5051

5152
$this->assertEquals(
5253
$connection = new Connection([], Pheanstalk::create('foobar', 15555)),
@@ -58,22 +59,32 @@ public function testFromDsn()
5859
$this->assertSame('default', $configuration['tube_name']);
5960
$this->assertSame(0, $configuration['timeout']);
6061
$this->assertSame(90, $configuration['ttr']);
62+
$this->assertFalse($configuration['bury_on_reject']);
6163
$this->assertSame('default', $connection->getTube());
6264
}
6365

6466
public function testFromDsnWithOptions()
6567
{
6668
$this->assertEquals(
67-
$connection = Connection::fromDsn('beanstalkd://localhost', ['tube_name' => 'foo', 'timeout' => 10, 'ttr' => 5000]),
68-
Connection::fromDsn('beanstalkd://localhost?tube_name=foo&timeout=10&ttr=5000')
69+
$connectionWithOptions = Connection::fromDsn('beanstalkd://localhost', ['tube_name' => 'foo', 'timeout' => 10, 'ttr' => 5000, 'bury_on_reject' => true]),
70+
$connectionWithQuery = Connection::fromDsn('beanstalkd://localhost?tube_name=foo&timeout=10&ttr=5000&bury_on_reject=true')
6971
);
7072

71-
$configuration = $connection->getConfiguration();
73+
$configuration = $connectionWithOptions->getConfiguration();
7274

7375
$this->assertSame('foo', $configuration['tube_name']);
7476
$this->assertSame(10, $configuration['timeout']);
7577
$this->assertSame(5000, $configuration['ttr']);
76-
$this->assertSame('foo', $connection->getTube());
78+
$this->assertTrue($configuration['bury_on_reject']);
79+
$this->assertSame('foo', $connectionWithOptions->getTube());
80+
81+
$configuration = $connectionWithQuery->getConfiguration();
82+
83+
$this->assertSame('foo', $configuration['tube_name']);
84+
$this->assertSame(10, $configuration['timeout']);
85+
$this->assertSame(5000, $configuration['ttr']);
86+
$this->assertTrue($configuration['bury_on_reject']);
87+
$this->assertSame('foo', $connectionWithOptions->getTube());
7788
}
7889

7990
public function testFromDsnOptionsArrayWinsOverOptionsFromDsn()
@@ -82,18 +93,20 @@ public function testFromDsnOptionsArrayWinsOverOptionsFromDsn()
8293
'tube_name' => 'bar',
8394
'timeout' => 20,
8495
'ttr' => 6000,
96+
'bury_on_reject' => false,
8597
];
8698

8799
$this->assertEquals(
88100
$connection = new Connection($options, Pheanstalk::create('localhost', 11333)),
89-
Connection::fromDsn('beanstalkd://localhost:11333?tube_name=foo&timeout=10&ttr=5000', $options)
101+
Connection::fromDsn('beanstalkd://localhost:11333?tube_name=foo&timeout=10&ttr=5000&bury_on_reject=true', $options)
90102
);
91103

92104
$configuration = $connection->getConfiguration();
93105

94106
$this->assertSame($options['tube_name'], $configuration['tube_name']);
95107
$this->assertSame($options['timeout'], $configuration['timeout']);
96108
$this->assertSame($options['ttr'], $configuration['ttr']);
109+
$this->assertSame($options['bury_on_reject'], $configuration['bury_on_reject']);
97110
$this->assertSame($options['tube_name'], $connection->getTube());
98111
}
99112

@@ -199,7 +212,12 @@ public function testAckWhenABeanstalkdExceptionOccurs()
199212
$connection->ack((string) $id);
200213
}
201214

202-
public function testReject()
215+
/**
216+
* @testWith [false, false]
217+
* [false, true]
218+
* [true, true]
219+
*/
220+
public function testReject(bool $buryOnReject, bool $forceDelete)
203221
{
204222
$id = 123456;
205223

@@ -209,11 +227,42 @@ public function testReject()
209227
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
210228
$client->expects($this->once())->method('delete')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id));
211229

212-
$connection = new Connection(['tube_name' => $tube], $client);
230+
$connection = new Connection(['tube_name' => $tube, 'bury_on_reject' => $buryOnReject], $client);
231+
232+
$connection->reject((string) $id, null, $forceDelete);
233+
}
234+
235+
public function testRejectWithBury()
236+
{
237+
$id = 123456;
238+
239+
$tube = 'baz';
240+
241+
$client = $this->createMock(PheanstalkInterface::class);
242+
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
243+
$client->expects($this->once())->method('bury')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id), 1024);
244+
245+
$connection = new Connection(['tube_name' => $tube, 'bury_on_reject' => true], $client);
213246

214247
$connection->reject((string) $id);
215248
}
216249

250+
public function testRejectWithBuryAndPriority()
251+
{
252+
$id = 123456;
253+
$priority = 2;
254+
255+
$tube = 'baz';
256+
257+
$client = $this->createMock(PheanstalkInterface::class);
258+
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
259+
$client->expects($this->once())->method('bury')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id), $priority);
260+
261+
$connection = new Connection(['tube_name' => $tube, 'bury_on_reject' => true], $client);
262+
263+
$connection->reject((string) $id, $priority);
264+
}
265+
217266
public function testRejectWhenABeanstalkdExceptionOccurs()
218267
{
219268
$id = 123456;
@@ -263,6 +312,40 @@ public function testMessageCountWhenABeanstalkdExceptionOccurs()
263312
$connection->getMessageCount();
264313
}
265314

315+
public function testMessagePriority()
316+
{
317+
$id = 123456;
318+
$priority = 51;
319+
320+
$tube = 'baz';
321+
322+
$response = new ArrayResponse('OK', ['pri' => $priority]);
323+
324+
$client = $this->createMock(PheanstalkInterface::class);
325+
$client->expects($this->once())->method('statsJob')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id))->willReturn($response);
326+
327+
$connection = new Connection(['tube_name' => $tube], $client);
328+
329+
$this->assertSame($priority, $connection->getMessagePriority((string) $id));
330+
}
331+
332+
public function testMessagePriorityWhenABeanstalkdExceptionOccurs()
333+
{
334+
$id = 123456;
335+
336+
$tube = 'baz1234';
337+
338+
$exception = new ClientException('foobar error');
339+
340+
$client = $this->createMock(PheanstalkInterface::class);
341+
$client->expects($this->once())->method('statsJob')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id))->willThrowException($exception);
342+
343+
$connection = new Connection(['tube_name' => $tube], $client);
344+
345+
$this->expectExceptionObject(new TransportException($exception->getMessage(), 0, $exception));
346+
$connection->getMessagePriority((string) $id);
347+
}
348+
266349
public function testSend()
267350
{
268351
$tube = 'xyz';

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdReceiver.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdReceiver.php
+10-2Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use Symfony\Component\Messenger\Envelope;
1515
use Symfony\Component\Messenger\Exception\LogicException;
1616
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
17+
use Symfony\Component\Messenger\Stamp\SentForRetryStamp;
1718
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
1819
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
1920
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
@@ -48,7 +49,10 @@ public function get(): iterable
4849
'headers' => $beanstalkdEnvelope['headers'],
4950
]);
5051
} catch (MessageDecodingFailedException $exception) {
51-
$this->connection->reject($beanstalkdEnvelope['id']);
52+
$this->connection->reject(
53+
$beanstalkdEnvelope['id'],
54+
$this->connection->getMessagePriority($beanstalkdEnvelope['id']),
55+
);
5256

5357
throw $exception;
5458
}
@@ -68,7 +72,11 @@ public function ack(Envelope $envelope): void
6872

6973
public function reject(Envelope $envelope): void
7074
{
71-
$this->connection->reject($this->findBeanstalkdReceivedStamp($envelope)->getId());
75+
$this->connection->reject(
76+
$this->findBeanstalkdReceivedStamp($envelope)->getId(),
77+
$envelope->last(BeanstalkdPriorityStamp::class)?->priority,
78+
$envelope->last(SentForRetryStamp::class)?->isSent ?? false,
79+
);
7280
}
7381

7482
public function keepalive(Envelope $envelope, ?int $seconds = null): void

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php
+30-3Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,13 @@ class Connection
3232
'tube_name' => PheanstalkInterface::DEFAULT_TUBE,
3333
'timeout' => 0,
3434
'ttr' => 90,
35+
'bury_on_reject' => false,
3536
];
3637

3738
private string $tube;
3839
private int $timeout;
3940
private int $ttr;
41+
private bool $buryOnReject;
4042

4143
/**
4244
* Constructor.
@@ -46,6 +48,7 @@ class Connection
4648
* * tube_name: name of the tube
4749
* * timeout: message reservation timeout (in seconds)
4850
* * ttr: the message time to run before it is put back in the ready queue (in seconds)
51+
* * bury_on_reject: bury rejected messages instead of deleting them
4952
*/
5053
public function __construct(
5154
private array $configuration,
@@ -55,6 +58,7 @@ public function __construct(
5558
$this->tube = $this->configuration['tube_name'];
5659
$this->timeout = $this->configuration['timeout'];
5760
$this->ttr = $this->configuration['ttr'];
61+
$this->buryOnReject = $this->configuration['bury_on_reject'];
5862
}
5963

6064
public static function fromDsn(#[\SensitiveParameter] string $dsn, array $options = []): self
@@ -74,7 +78,15 @@ public static function fromDsn(#[\SensitiveParameter] string $dsn, array $option
7478
}
7579

7680
$configuration = [];
77-
$configuration += $options + $query + self::DEFAULT_OPTIONS;
81+
foreach (self::DEFAULT_OPTIONS as $k => $v) {
82+
$value = $options[$k] ?? $query[$k] ?? $v;
83+
84+
$configuration[$k] = match (\gettype($v)) {
85+
'integer' => filter_var($value, \FILTER_VALIDATE_INT),
86+
'boolean' => filter_var($value, \FILTER_VALIDATE_BOOL),
87+
default => $value,
88+
};
89+
}
7890

7991
// check for extra keys in options
8092
$optionsExtraKeys = array_diff(array_keys($options), array_keys(self::DEFAULT_OPTIONS));
@@ -172,10 +184,14 @@ public function ack(string $id): void
172184
}
173185
}
174186

175-
public function reject(string $id): void
187+
public function reject(string $id, ?int $priority = null, bool $forceDelete = false): void
176188
{
177189
try {
178-
$this->client->useTube($this->tube)->delete(new JobId((int) $id));
190+
if (!$forceDelete && $this->buryOnReject) {
191+
$this->client->useTube($this->tube)->bury(new JobId((int) $id), $priority ?? PheanstalkInterface::DEFAULT_PRIORITY);
192+
} else {
193+
$this->client->useTube($this->tube)->delete(new JobId((int) $id));
194+
}
179195
} catch (Exception $exception) {
180196
throw new TransportException($exception->getMessage(), 0, $exception);
181197
}
@@ -201,4 +217,15 @@ public function getMessageCount(): int
201217

202218
return (int) $tubeStats['current-jobs-ready'];
203219
}
220+
221+
public function getMessagePriority(string $id): int
222+
{
223+
try {
224+
$jobStats = $this->client->statsJob(new JobId((int) $id));
225+
} catch (Exception $exception) {
226+
throw new TransportException($exception->getMessage(), 0, $exception);
227+
}
228+
229+
return (int) $jobStats['pri'];
230+
}
204231
}

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/composer.json

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Beanstalkd/composer.json
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
"require": {
1515
"php": ">=8.2",
1616
"pda/pheanstalk": "^4.0",
17-
"symfony/messenger": "^7.2"
17+
"symfony/messenger": "^7.3"
1818
},
1919
"require-dev": {
2020
"symfony/property-access": "^6.4|^7.0",

‎src/Symfony/Component/Messenger/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/CHANGELOG.md
+5Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
CHANGELOG
22
=========
33

4+
7.3
5+
---
6+
7+
* Add `SentForRetryStamp` that identifies whether a failed message was sent for retry
8+
49
7.2
510
---
611

‎src/Symfony/Component/Messenger/EventListener/SendFailedMessageForRetryListener.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/EventListener/SendFailedMessageForRetryListener.php
+3Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
2626
use Symfony\Component\Messenger\Stamp\DelayStamp;
2727
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
28+
use Symfony\Component\Messenger\Stamp\SentForRetryStamp;
2829
use Symfony\Component\Messenger\Stamp\StampInterface;
2930
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
3031
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
@@ -82,6 +83,8 @@ public function onMessageFailed(WorkerMessageFailedEvent $event): void
8283
} else {
8384
$this->logger?->critical('Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"', $context + ['retryCount' => $retryCount, 'error' => $throwable->getMessage(), 'exception' => $throwable]);
8485
}
86+
87+
$event->addStamps(new SentForRetryStamp($shouldRetry));
8588
}
8689

8790
/**

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.