Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit d03cbd3

Browse filesBrowse files
committed
[Messenger] Add bury_on_reject option to Beanstalkd bridge
1 parent 3d1adcb commit d03cbd3
Copy full SHA for d03cbd3

File tree

5 files changed

+94
-11
lines changed
Filter options

5 files changed

+94
-11
lines changed

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Beanstalkd/CHANGELOG.md
+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ CHANGELOG
55
---
66

77
* Add `BeanstalkdPriorityStamp` option to allow setting the message priority
8+
* Add `bury_on_reject` option to bury failed messages instead of deleting them
89

910
7.2
1011
---

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdReceiverTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdReceiverTest.php
+29
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\Connection;
1919
use Symfony\Component\Messenger\Envelope;
2020
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
21+
use Symfony\Component\Messenger\Stamp\SentForRetryStamp;
2122
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
2223
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
2324
use Symfony\Component\Serializer as SerializerComponent;
@@ -79,6 +80,34 @@ public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException()
7980
$receiver->get();
8081
}
8182

83+
/**
84+
* @dataProvider provideRejectCases
85+
*/
86+
public function testReject(array $stamps, bool $forceDelete)
87+
{
88+
$serializer = $this->createSerializer();
89+
90+
$id = 'some id';
91+
92+
$connection = $this->createMock(Connection::class);
93+
$connection->expects($this->once())->method('reject')->with($id, $forceDelete);
94+
95+
$envelope = (new Envelope(new DummyMessage('Oy')))->with(new BeanstalkdReceivedStamp($id, 'foo bar'));
96+
foreach ($stamps as $stamp) {
97+
$envelope = $envelope->with($stamp);
98+
}
99+
100+
$receiver = new BeanstalkdReceiver($connection, $serializer);
101+
$receiver->reject($envelope);
102+
}
103+
104+
public static function provideRejectCases(): iterable
105+
{
106+
yield 'No stamp' => [[], false];
107+
yield 'With sent for retry true' => [[new SentForRetryStamp(true)], true];
108+
yield 'With sent for retry false' => [[new SentForRetryStamp(false)], false];
109+
}
110+
82111
public function testKeepalive()
83112
{
84113
$serializer = $this->createSerializer();

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/ConnectionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/ConnectionTest.php
+40-7
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public function testFromDsn()
4747
$this->assertSame('default', $configuration['tube_name']);
4848
$this->assertSame(0, $configuration['timeout']);
4949
$this->assertSame(90, $configuration['ttr']);
50+
$this->assertFalse($configuration['bury_on_reject']);
5051

5152
$this->assertEquals(
5253
$connection = new Connection([], Pheanstalk::create('foobar', 15555)),
@@ -58,22 +59,32 @@ public function testFromDsn()
5859
$this->assertSame('default', $configuration['tube_name']);
5960
$this->assertSame(0, $configuration['timeout']);
6061
$this->assertSame(90, $configuration['ttr']);
62+
$this->assertFalse($configuration['bury_on_reject']);
6163
$this->assertSame('default', $connection->getTube());
6264
}
6365

6466
public function testFromDsnWithOptions()
6567
{
6668
$this->assertEquals(
67-
$connection = Connection::fromDsn('beanstalkd://localhost', ['tube_name' => 'foo', 'timeout' => 10, 'ttr' => 5000]),
68-
Connection::fromDsn('beanstalkd://localhost?tube_name=foo&timeout=10&ttr=5000')
69+
$connectionWithOptions = Connection::fromDsn('beanstalkd://localhost', ['tube_name' => 'foo', 'timeout' => 10, 'ttr' => 5000, 'bury_on_reject' => true]),
70+
$connectionWithQuery = Connection::fromDsn('beanstalkd://localhost?tube_name=foo&timeout=10&ttr=5000&bury_on_reject=true')
6971
);
7072

71-
$configuration = $connection->getConfiguration();
73+
$configuration = $connectionWithOptions->getConfiguration();
7274

7375
$this->assertSame('foo', $configuration['tube_name']);
7476
$this->assertSame(10, $configuration['timeout']);
7577
$this->assertSame(5000, $configuration['ttr']);
76-
$this->assertSame('foo', $connection->getTube());
78+
$this->assertTrue($configuration['bury_on_reject']);
79+
$this->assertSame('foo', $connectionWithOptions->getTube());
80+
81+
$configuration = $connectionWithQuery->getConfiguration();
82+
83+
$this->assertSame('foo', $configuration['tube_name']);
84+
$this->assertSame(10, $configuration['timeout']);
85+
$this->assertSame(5000, $configuration['ttr']);
86+
$this->assertTrue($configuration['bury_on_reject']);
87+
$this->assertSame('foo', $connectionWithOptions->getTube());
7788
}
7889

7990
public function testFromDsnOptionsArrayWinsOverOptionsFromDsn()
@@ -82,18 +93,20 @@ public function testFromDsnOptionsArrayWinsOverOptionsFromDsn()
8293
'tube_name' => 'bar',
8394
'timeout' => 20,
8495
'ttr' => 6000,
96+
'bury_on_reject' => false,
8597
];
8698

8799
$this->assertEquals(
88100
$connection = new Connection($options, Pheanstalk::create('localhost', 11333)),
89-
Connection::fromDsn('beanstalkd://localhost:11333?tube_name=foo&timeout=10&ttr=5000', $options)
101+
Connection::fromDsn('beanstalkd://localhost:11333?tube_name=foo&timeout=10&ttr=5000&bury_on_reject=true', $options)
90102
);
91103

92104
$configuration = $connection->getConfiguration();
93105

94106
$this->assertSame($options['tube_name'], $configuration['tube_name']);
95107
$this->assertSame($options['timeout'], $configuration['timeout']);
96108
$this->assertSame($options['ttr'], $configuration['ttr']);
109+
$this->assertSame($options['bury_on_reject'], $configuration['bury_on_reject']);
97110
$this->assertSame($options['tube_name'], $connection->getTube());
98111
}
99112

@@ -199,7 +212,12 @@ public function testAckWhenABeanstalkdExceptionOccurs()
199212
$connection->ack((string) $id);
200213
}
201214

202-
public function testReject()
215+
/**
216+
* @testWith [false, false]
217+
* [false, true]
218+
* [true, true]
219+
*/
220+
public function testReject(bool $buryOnReject, bool $forceDelete)
203221
{
204222
$id = 123456;
205223

@@ -209,7 +227,22 @@ public function testReject()
209227
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
210228
$client->expects($this->once())->method('delete')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id));
211229

212-
$connection = new Connection(['tube_name' => $tube], $client);
230+
$connection = new Connection(['tube_name' => $tube, 'bury_on_reject' => $buryOnReject], $client);
231+
232+
$connection->reject((string) $id, $forceDelete);
233+
}
234+
235+
public function testRejectWithBury()
236+
{
237+
$id = 123456;
238+
239+
$tube = 'baz';
240+
241+
$client = $this->createMock(PheanstalkInterface::class);
242+
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
243+
$client->expects($this->once())->method('bury')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id));
244+
245+
$connection = new Connection(['tube_name' => $tube, 'bury_on_reject' => true], $client);
213246

214247
$connection->reject((string) $id);
215248
}

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdReceiver.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdReceiver.php
+5-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use Symfony\Component\Messenger\Envelope;
1515
use Symfony\Component\Messenger\Exception\LogicException;
1616
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
17+
use Symfony\Component\Messenger\Stamp\SentForRetryStamp;
1718
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
1819
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
1920
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
@@ -62,7 +63,10 @@ public function ack(Envelope $envelope): void
6263

6364
public function reject(Envelope $envelope): void
6465
{
65-
$this->connection->reject($this->findBeanstalkdReceivedStamp($envelope)->getId());
66+
$this->connection->reject(
67+
$this->findBeanstalkdReceivedStamp($envelope)->getId(),
68+
$envelope->last(SentForRetryStamp::class)?->isSent ?? false,
69+
);
6670
}
6771

6872
public function keepalive(Envelope $envelope, ?int $seconds = null): void

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php
+19-3
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,13 @@ class Connection
3232
'tube_name' => PheanstalkInterface::DEFAULT_TUBE,
3333
'timeout' => 0,
3434
'ttr' => 90,
35+
'bury_on_reject' => false,
3536
];
3637

3738
private string $tube;
3839
private int $timeout;
3940
private int $ttr;
41+
private bool $buryOnReject;
4042

4143
/**
4244
* Constructor.
@@ -46,6 +48,7 @@ class Connection
4648
* * tube_name: name of the tube
4749
* * timeout: message reservation timeout (in seconds)
4850
* * ttr: the message time to run before it is put back in the ready queue (in seconds)
51+
* * bury_on_reject: bury rejected messages instead of deleting them
4952
*/
5053
public function __construct(
5154
private array $configuration,
@@ -55,6 +58,7 @@ public function __construct(
5558
$this->tube = $this->configuration['tube_name'];
5659
$this->timeout = $this->configuration['timeout'];
5760
$this->ttr = $this->configuration['ttr'];
61+
$this->buryOnReject = $this->configuration['bury_on_reject'];
5862
}
5963

6064
public static function fromDsn(#[\SensitiveParameter] string $dsn, array $options = []): self
@@ -74,7 +78,15 @@ public static function fromDsn(#[\SensitiveParameter] string $dsn, array $option
7478
}
7579

7680
$configuration = [];
77-
$configuration += $options + $query + self::DEFAULT_OPTIONS;
81+
foreach (self::DEFAULT_OPTIONS as $k => $v) {
82+
$value = $options[$k] ?? $query[$k] ?? $v;
83+
84+
$configuration[$k] = match (\gettype($v)) {
85+
'integer' => filter_var($value, \FILTER_VALIDATE_INT),
86+
'boolean' => filter_var($value, \FILTER_VALIDATE_BOOL),
87+
default => $value,
88+
};
89+
}
7890

7991
// check for extra keys in options
8092
$optionsExtraKeys = array_diff(array_keys($options), array_keys(self::DEFAULT_OPTIONS));
@@ -172,10 +184,14 @@ public function ack(string $id): void
172184
}
173185
}
174186

175-
public function reject(string $id): void
187+
public function reject(string $id, bool $forceDelete = false): void
176188
{
177189
try {
178-
$this->client->useTube($this->tube)->delete(new JobId((int) $id));
190+
if (!$forceDelete && $this->buryOnReject) {
191+
$this->client->useTube($this->tube)->bury(new JobId((int) $id));
192+
} else {
193+
$this->client->useTube($this->tube)->delete(new JobId((int) $id));
194+
}
179195
} catch (Exception $exception) {
180196
throw new TransportException($exception->getMessage(), 0, $exception);
181197
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.