Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit b706a44

Browse filesBrowse files
committed
[Messenger] Add bury_on_reject option to Beanstalkd bridge
1 parent 3d1adcb commit b706a44
Copy full SHA for b706a44

File tree

5 files changed

+164
-13
lines changed
Filter options

5 files changed

+164
-13
lines changed

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Beanstalkd/CHANGELOG.md
+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ CHANGELOG
55
---
66

77
* Add `BeanstalkdPriorityStamp` option to allow setting the message priority
8+
* Add `bury_on_reject` option to bury failed messages instead of deleting them
89

910
7.2
1011
---

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdReceiverTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdReceiverTest.php
+33-1
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,13 @@
1313

1414
use PHPUnit\Framework\TestCase;
1515
use Symfony\Component\Messenger\Bridge\Beanstalkd\Tests\Fixtures\DummyMessage;
16+
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdPriorityStamp;
1617
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceivedStamp;
1718
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceiver;
1819
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\Connection;
1920
use Symfony\Component\Messenger\Envelope;
2021
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
22+
use Symfony\Component\Messenger\Stamp\SentForRetryStamp;
2123
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
2224
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
2325
use Symfony\Component\Serializer as SerializerComponent;
@@ -73,12 +75,42 @@ public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException()
7375
$beanstalkdEnvelope = $this->createBeanstalkdEnvelope();
7476
$connection = $this->createMock(Connection::class);
7577
$connection->expects($this->once())->method('get')->willReturn($beanstalkdEnvelope);
76-
$connection->expects($this->once())->method('reject');
78+
$connection->expects($this->once())->method('getMessagePriority')->with($beanstalkdEnvelope['id'])->willReturn(2);
79+
$connection->expects($this->once())->method('reject')->with($beanstalkdEnvelope['id'], 2);
7780

7881
$receiver = new BeanstalkdReceiver($connection, $serializer);
7982
$receiver->get();
8083
}
8184

85+
/**
86+
* @dataProvider provideRejectCases
87+
*/
88+
public function testReject(array $stamps, ?int $priority, bool $forceDelete)
89+
{
90+
$serializer = $this->createSerializer();
91+
92+
$id = 'some id';
93+
94+
$connection = $this->createMock(Connection::class);
95+
$connection->expects($this->once())->method('reject')->with($id, $priority, $forceDelete);
96+
97+
$envelope = (new Envelope(new DummyMessage('Oy')))->with(new BeanstalkdReceivedStamp($id, 'foo bar'));
98+
foreach ($stamps as $stamp) {
99+
$envelope = $envelope->with($stamp);
100+
}
101+
102+
$receiver = new BeanstalkdReceiver($connection, $serializer);
103+
$receiver->reject($envelope);
104+
}
105+
106+
public static function provideRejectCases(): iterable
107+
{
108+
yield 'No stamp' => [[], null, false];
109+
yield 'With sent for retry true' => [[new SentForRetryStamp(true)], null, true];
110+
yield 'With sent for retry true and priority' => [[new BeanstalkdPriorityStamp(2), new SentForRetryStamp(true)], 2, true];
111+
yield 'With sent for retry false' => [[new SentForRetryStamp(false)], null, false];
112+
}
113+
82114
public function testKeepalive()
83115
{
84116
$serializer = $this->createSerializer();

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/ConnectionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/ConnectionTest.php
+90-7
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public function testFromDsn()
4747
$this->assertSame('default', $configuration['tube_name']);
4848
$this->assertSame(0, $configuration['timeout']);
4949
$this->assertSame(90, $configuration['ttr']);
50+
$this->assertFalse($configuration['bury_on_reject']);
5051

5152
$this->assertEquals(
5253
$connection = new Connection([], Pheanstalk::create('foobar', 15555)),
@@ -58,22 +59,32 @@ public function testFromDsn()
5859
$this->assertSame('default', $configuration['tube_name']);
5960
$this->assertSame(0, $configuration['timeout']);
6061
$this->assertSame(90, $configuration['ttr']);
62+
$this->assertFalse($configuration['bury_on_reject']);
6163
$this->assertSame('default', $connection->getTube());
6264
}
6365

6466
public function testFromDsnWithOptions()
6567
{
6668
$this->assertEquals(
67-
$connection = Connection::fromDsn('beanstalkd://localhost', ['tube_name' => 'foo', 'timeout' => 10, 'ttr' => 5000]),
68-
Connection::fromDsn('beanstalkd://localhost?tube_name=foo&timeout=10&ttr=5000')
69+
$connectionWithOptions = Connection::fromDsn('beanstalkd://localhost', ['tube_name' => 'foo', 'timeout' => 10, 'ttr' => 5000, 'bury_on_reject' => true]),
70+
$connectionWithQuery = Connection::fromDsn('beanstalkd://localhost?tube_name=foo&timeout=10&ttr=5000&bury_on_reject=true')
6971
);
7072

71-
$configuration = $connection->getConfiguration();
73+
$configuration = $connectionWithOptions->getConfiguration();
7274

7375
$this->assertSame('foo', $configuration['tube_name']);
7476
$this->assertSame(10, $configuration['timeout']);
7577
$this->assertSame(5000, $configuration['ttr']);
76-
$this->assertSame('foo', $connection->getTube());
78+
$this->assertTrue($configuration['bury_on_reject']);
79+
$this->assertSame('foo', $connectionWithOptions->getTube());
80+
81+
$configuration = $connectionWithQuery->getConfiguration();
82+
83+
$this->assertSame('foo', $configuration['tube_name']);
84+
$this->assertSame(10, $configuration['timeout']);
85+
$this->assertSame(5000, $configuration['ttr']);
86+
$this->assertTrue($configuration['bury_on_reject']);
87+
$this->assertSame('foo', $connectionWithOptions->getTube());
7788
}
7889

7990
public function testFromDsnOptionsArrayWinsOverOptionsFromDsn()
@@ -82,18 +93,20 @@ public function testFromDsnOptionsArrayWinsOverOptionsFromDsn()
8293
'tube_name' => 'bar',
8394
'timeout' => 20,
8495
'ttr' => 6000,
96+
'bury_on_reject' => false,
8597
];
8698

8799
$this->assertEquals(
88100
$connection = new Connection($options, Pheanstalk::create('localhost', 11333)),
89-
Connection::fromDsn('beanstalkd://localhost:11333?tube_name=foo&timeout=10&ttr=5000', $options)
101+
Connection::fromDsn('beanstalkd://localhost:11333?tube_name=foo&timeout=10&ttr=5000&bury_on_reject=true', $options)
90102
);
91103

92104
$configuration = $connection->getConfiguration();
93105

94106
$this->assertSame($options['tube_name'], $configuration['tube_name']);
95107
$this->assertSame($options['timeout'], $configuration['timeout']);
96108
$this->assertSame($options['ttr'], $configuration['ttr']);
109+
$this->assertSame($options['bury_on_reject'], $configuration['bury_on_reject']);
97110
$this->assertSame($options['tube_name'], $connection->getTube());
98111
}
99112

@@ -199,7 +212,12 @@ public function testAckWhenABeanstalkdExceptionOccurs()
199212
$connection->ack((string) $id);
200213
}
201214

202-
public function testReject()
215+
/**
216+
* @testWith [false, false]
217+
* [false, true]
218+
* [true, true]
219+
*/
220+
public function testReject(bool $buryOnReject, bool $forceDelete)
203221
{
204222
$id = 123456;
205223

@@ -209,11 +227,42 @@ public function testReject()
209227
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
210228
$client->expects($this->once())->method('delete')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id));
211229

212-
$connection = new Connection(['tube_name' => $tube], $client);
230+
$connection = new Connection(['tube_name' => $tube, 'bury_on_reject' => $buryOnReject], $client);
231+
232+
$connection->reject((string) $id, null, $forceDelete);
233+
}
234+
235+
public function testRejectWithBury()
236+
{
237+
$id = 123456;
238+
239+
$tube = 'baz';
240+
241+
$client = $this->createMock(PheanstalkInterface::class);
242+
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
243+
$client->expects($this->once())->method('bury')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id), 1024);
244+
245+
$connection = new Connection(['tube_name' => $tube, 'bury_on_reject' => true], $client);
213246

214247
$connection->reject((string) $id);
215248
}
216249

250+
public function testRejectWithBuryAndPriority()
251+
{
252+
$id = 123456;
253+
$priority = 2;
254+
255+
$tube = 'baz';
256+
257+
$client = $this->createMock(PheanstalkInterface::class);
258+
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
259+
$client->expects($this->once())->method('bury')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id), $priority);
260+
261+
$connection = new Connection(['tube_name' => $tube, 'bury_on_reject' => true], $client);
262+
263+
$connection->reject((string) $id, $priority);
264+
}
265+
217266
public function testRejectWhenABeanstalkdExceptionOccurs()
218267
{
219268
$id = 123456;
@@ -263,6 +312,40 @@ public function testMessageCountWhenABeanstalkdExceptionOccurs()
263312
$connection->getMessageCount();
264313
}
265314

315+
public function testMessagePriority()
316+
{
317+
$id = 123456;
318+
$priority = 51;
319+
320+
$tube = 'baz';
321+
322+
$response = new ArrayResponse('OK', ['pri' => $priority]);
323+
324+
$client = $this->createMock(PheanstalkInterface::class);
325+
$client->expects($this->once())->method('statsJob')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id))->willReturn($response);
326+
327+
$connection = new Connection(['tube_name' => $tube], $client);
328+
329+
$this->assertSame($priority, $connection->getMessagePriority((string) $id));
330+
}
331+
332+
public function testMessagePriorityWhenABeanstalkdExceptionOccurs()
333+
{
334+
$id = 123456;
335+
336+
$tube = 'baz1234';
337+
338+
$exception = new ClientException('foobar error');
339+
340+
$client = $this->createMock(PheanstalkInterface::class);
341+
$client->expects($this->once())->method('statsJob')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id))->willThrowException($exception);
342+
343+
$connection = new Connection(['tube_name' => $tube], $client);
344+
345+
$this->expectExceptionObject(new TransportException($exception->getMessage(), 0, $exception));
346+
$connection->getMessagePriority((string) $id);
347+
}
348+
266349
public function testSend()
267350
{
268351
$tube = 'xyz';

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdReceiver.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdReceiver.php
+10-2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use Symfony\Component\Messenger\Envelope;
1515
use Symfony\Component\Messenger\Exception\LogicException;
1616
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
17+
use Symfony\Component\Messenger\Stamp\SentForRetryStamp;
1718
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
1819
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
1920
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
@@ -47,7 +48,10 @@ public function get(): iterable
4748
'headers' => $beanstalkdEnvelope['headers'],
4849
]);
4950
} catch (MessageDecodingFailedException $exception) {
50-
$this->connection->reject($beanstalkdEnvelope['id']);
51+
$this->connection->reject(
52+
$beanstalkdEnvelope['id'],
53+
$this->connection->getMessagePriority($beanstalkdEnvelope['id']),
54+
);
5155

5256
throw $exception;
5357
}
@@ -62,7 +66,11 @@ public function ack(Envelope $envelope): void
6266

6367
public function reject(Envelope $envelope): void
6468
{
65-
$this->connection->reject($this->findBeanstalkdReceivedStamp($envelope)->getId());
69+
$this->connection->reject(
70+
$this->findBeanstalkdReceivedStamp($envelope)->getId(),
71+
$envelope->last(BeanstalkdPriorityStamp::class)?->priority,
72+
$envelope->last(SentForRetryStamp::class)?->isSent ?? false,
73+
);
6674
}
6775

6876
public function keepalive(Envelope $envelope, ?int $seconds = null): void

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php
+30-3
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,13 @@ class Connection
3232
'tube_name' => PheanstalkInterface::DEFAULT_TUBE,
3333
'timeout' => 0,
3434
'ttr' => 90,
35+
'bury_on_reject' => false,
3536
];
3637

3738
private string $tube;
3839
private int $timeout;
3940
private int $ttr;
41+
private bool $buryOnReject;
4042

4143
/**
4244
* Constructor.
@@ -46,6 +48,7 @@ class Connection
4648
* * tube_name: name of the tube
4749
* * timeout: message reservation timeout (in seconds)
4850
* * ttr: the message time to run before it is put back in the ready queue (in seconds)
51+
* * bury_on_reject: bury rejected messages instead of deleting them
4952
*/
5053
public function __construct(
5154
private array $configuration,
@@ -55,6 +58,7 @@ public function __construct(
5558
$this->tube = $this->configuration['tube_name'];
5659
$this->timeout = $this->configuration['timeout'];
5760
$this->ttr = $this->configuration['ttr'];
61+
$this->buryOnReject = $this->configuration['bury_on_reject'];
5862
}
5963

6064
public static function fromDsn(#[\SensitiveParameter] string $dsn, array $options = []): self
@@ -74,7 +78,15 @@ public static function fromDsn(#[\SensitiveParameter] string $dsn, array $option
7478
}
7579

7680
$configuration = [];
77-
$configuration += $options + $query + self::DEFAULT_OPTIONS;
81+
foreach (self::DEFAULT_OPTIONS as $k => $v) {
82+
$value = $options[$k] ?? $query[$k] ?? $v;
83+
84+
$configuration[$k] = match (\gettype($v)) {
85+
'integer' => filter_var($value, \FILTER_VALIDATE_INT),
86+
'boolean' => filter_var($value, \FILTER_VALIDATE_BOOL),
87+
default => $value,
88+
};
89+
}
7890

7991
// check for extra keys in options
8092
$optionsExtraKeys = array_diff(array_keys($options), array_keys(self::DEFAULT_OPTIONS));
@@ -172,10 +184,14 @@ public function ack(string $id): void
172184
}
173185
}
174186

175-
public function reject(string $id): void
187+
public function reject(string $id, ?int $priority = null, bool $forceDelete = false): void
176188
{
177189
try {
178-
$this->client->useTube($this->tube)->delete(new JobId((int) $id));
190+
if (!$forceDelete && $this->buryOnReject) {
191+
$this->client->useTube($this->tube)->bury(new JobId((int) $id), $priority ?? PheanstalkInterface::DEFAULT_PRIORITY);
192+
} else {
193+
$this->client->useTube($this->tube)->delete(new JobId((int) $id));
194+
}
179195
} catch (Exception $exception) {
180196
throw new TransportException($exception->getMessage(), 0, $exception);
181197
}
@@ -201,4 +217,15 @@ public function getMessageCount(): int
201217

202218
return (int) $tubeStats['current-jobs-ready'];
203219
}
220+
221+
public function getMessagePriority(string $id): int
222+
{
223+
try {
224+
$jobStats = $this->client->statsJob(new JobId((int) $id));
225+
} catch (Exception $exception) {
226+
throw new TransportException($exception->getMessage(), 0, $exception);
227+
}
228+
229+
return (int) $jobStats['pri'];
230+
}
204231
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.