Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 133ce2b

Browse filesBrowse files
committed
[Messenger] Add priority and bury_on_reject options to Beanstalkd bridge
1 parent e2d5c4a commit 133ce2b
Copy full SHA for 133ce2b

File tree

3 files changed

+71
-10
lines changed
Filter options

3 files changed

+71
-10
lines changed

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Beanstalkd/CHANGELOG.md
+5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
CHANGELOG
22
=========
33

4+
6.3
5+
---
6+
7+
* Add `priority` and `bury_on_reject` options
8+
49
5.2.0
510
-----
611

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/ConnectionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/ConnectionTest.php
+43-7
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ public function testFromDsn()
4747
$this->assertSame('default', $configuration['tube_name']);
4848
$this->assertSame(0, $configuration['timeout']);
4949
$this->assertSame(90, $configuration['ttr']);
50+
$this->assertSame(PheanstalkInterface::DEFAULT_PRIORITY, $configuration['priority']);
51+
$this->assertFalse($configuration['bury_on_reject']);
5052

5153
$this->assertEquals(
5254
$connection = new Connection([], Pheanstalk::create('foobar', 15555)),
@@ -58,22 +60,35 @@ public function testFromDsn()
5860
$this->assertSame('default', $configuration['tube_name']);
5961
$this->assertSame(0, $configuration['timeout']);
6062
$this->assertSame(90, $configuration['ttr']);
63+
$this->assertSame(PheanstalkInterface::DEFAULT_PRIORITY, $configuration['priority']);
64+
$this->assertFalse($configuration['bury_on_reject']);
6165
$this->assertSame('default', $connection->getTube());
6266
}
6367

6468
public function testFromDsnWithOptions()
6569
{
6670
$this->assertEquals(
67-
$connection = Connection::fromDsn('beanstalkd://localhost', ['tube_name' => 'foo', 'timeout' => 10, 'ttr' => 5000]),
68-
Connection::fromDsn('beanstalkd://localhost?tube_name=foo&timeout=10&ttr=5000')
71+
$connectionWithOptions = Connection::fromDsn('beanstalkd://localhost', ['tube_name' => 'foo', 'timeout' => 10, 'ttr' => 5000, 'priority' => 300, 'bury_on_reject' => true]),
72+
$connectionWithQuery = Connection::fromDsn('beanstalkd://localhost?tube_name=foo&timeout=10&ttr=5000&priority=300&bury_on_reject=true')
6973
);
7074

71-
$configuration = $connection->getConfiguration();
75+
$configuration = $connectionWithOptions->getConfiguration();
76+
77+
$this->assertSame('foo', $configuration['tube_name']);
78+
$this->assertSame(10, $configuration['timeout']);
79+
$this->assertSame(5000, $configuration['ttr']);
80+
$this->assertSame(300, $configuration['priority']);
81+
$this->assertTrue($configuration['bury_on_reject']);
82+
$this->assertSame('foo', $connectionWithOptions->getTube());
83+
84+
$configuration = $connectionWithQuery->getConfiguration();
7285

7386
$this->assertSame('foo', $configuration['tube_name']);
7487
$this->assertSame(10, $configuration['timeout']);
7588
$this->assertSame(5000, $configuration['ttr']);
76-
$this->assertSame('foo', $connection->getTube());
89+
$this->assertSame(300, $configuration['priority']);
90+
$this->assertTrue($configuration['bury_on_reject']);
91+
$this->assertSame('foo', $connectionWithOptions->getTube());
7792
}
7893

7994
public function testFromDsnOptionsArrayWinsOverOptionsFromDsn()
@@ -82,18 +97,22 @@ public function testFromDsnOptionsArrayWinsOverOptionsFromDsn()
8297
'tube_name' => 'bar',
8398
'timeout' => 20,
8499
'ttr' => 6000,
100+
'priority' => 300,
101+
'bury_on_reject' => false,
85102
];
86103

87104
$this->assertEquals(
88105
$connection = new Connection($options, Pheanstalk::create('localhost', 11333)),
89-
Connection::fromDsn('beanstalkd://localhost:11333?tube_name=foo&timeout=10&ttr=5000', $options)
106+
Connection::fromDsn('beanstalkd://localhost:11333?tube_name=foo&timeout=10&ttr=5000&priority=500&bury_on_reject=true', $options)
90107
);
91108

92109
$configuration = $connection->getConfiguration();
93110

94111
$this->assertSame($options['tube_name'], $configuration['tube_name']);
95112
$this->assertSame($options['timeout'], $configuration['timeout']);
96113
$this->assertSame($options['ttr'], $configuration['ttr']);
114+
$this->assertSame($options['priority'], $configuration['priority']);
115+
$this->assertSame($options['bury_on_reject'], $configuration['bury_on_reject']);
97116
$this->assertSame($options['tube_name'], $connection->getTube());
98117
}
99118

@@ -214,6 +233,22 @@ public function testReject()
214233
$connection->reject((string) $id);
215234
}
216235

236+
public function testRejectWithBury()
237+
{
238+
$id = 123456;
239+
240+
$tube = 'baz';
241+
$priority = 300;
242+
243+
$client = $this->createMock(PheanstalkInterface::class);
244+
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
245+
$client->expects($this->once())->method('bury')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id), $priority);
246+
247+
$connection = new Connection(['tube_name' => $tube, 'priority' => $priority, 'bury_on_reject' => true], $client);
248+
249+
$connection->reject((string) $id);
250+
}
251+
217252
public function testRejectWhenABeanstalkdExceptionOccurs()
218253
{
219254
$id = 123456;
@@ -266,6 +301,7 @@ public function testMessageCountWhenABeanstalkdExceptionOccurs()
266301
public function testSend()
267302
{
268303
$tube = 'xyz';
304+
$priority = 300;
269305

270306
$body = 'foo';
271307
$headers = ['test' => 'bar'];
@@ -285,12 +321,12 @@ public function testSend()
285321

286322
return $expectedMessage === $data;
287323
}),
288-
1024,
324+
$priority,
289325
$expectedDelay,
290326
90
291327
)->willReturn(new Job($id, 'foobar'));
292328

293-
$connection = new Connection(['tube_name' => $tube], $client);
329+
$connection = new Connection(['tube_name' => $tube, 'priority' => $priority], $client);
294330

295331
$returnedId = $connection->send($body, $headers, $delay);
296332

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php
+23-3
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ class Connection
3232
'tube_name' => PheanstalkInterface::DEFAULT_TUBE,
3333
'timeout' => 0,
3434
'ttr' => 90,
35+
'priority' => PheanstalkInterface::DEFAULT_PRIORITY,
36+
'bury_on_reject' => false,
3537
];
3638

3739
/**
@@ -40,12 +42,16 @@ class Connection
4042
* * tube_name: name of the tube
4143
* * timeout: message reservation timeout (in seconds)
4244
* * ttr: the message time to run before it is put back in the ready queue (in seconds)
45+
* * priority: priority by which the message will be reserved, from 0 (most urgent) to 0xFFFFFFFF (least urgent)
46+
* * bury_on_reject: bury rejected messages instead of deleting them
4347
*/
4448
private array $configuration;
4549
private PheanstalkInterface $client;
4650
private string $tube;
4751
private int $timeout;
4852
private int $ttr;
53+
private int $priority;
54+
private bool $buryOnReject;
4955

5056
public function __construct(array $configuration, PheanstalkInterface $client)
5157
{
@@ -54,6 +60,8 @@ public function __construct(array $configuration, PheanstalkInterface $client)
5460
$this->tube = $this->configuration['tube_name'];
5561
$this->timeout = $this->configuration['timeout'];
5662
$this->ttr = $this->configuration['ttr'];
63+
$this->priority = $this->configuration['priority'];
64+
$this->buryOnReject = $this->configuration['bury_on_reject'];
5765
}
5866

5967
public static function fromDsn(#[\SensitiveParameter] string $dsn, array $options = []): self
@@ -73,7 +81,15 @@ public static function fromDsn(#[\SensitiveParameter] string $dsn, array $option
7381
}
7482

7583
$configuration = [];
76-
$configuration += $options + $query + self::DEFAULT_OPTIONS;
84+
foreach (self::DEFAULT_OPTIONS as $k => $v) {
85+
$value = $options[$k] ?? $query[$k] ?? $v;
86+
87+
$configuration[$k] = match (\gettype($v)) {
88+
'integer' => filter_var($value, \FILTER_VALIDATE_INT),
89+
'boolean' => filter_var($value, \FILTER_VALIDATE_BOOL),
90+
default => $value,
91+
};
92+
}
7793

7894
// check for extra keys in options
7995
$optionsExtraKeys = array_diff(array_keys($options), array_keys(self::DEFAULT_OPTIONS));
@@ -122,7 +138,7 @@ public function send(string $body, array $headers, int $delay = 0): string
122138
try {
123139
$job = $this->client->useTube($this->tube)->put(
124140
$message,
125-
PheanstalkInterface::DEFAULT_PRIORITY,
141+
$this->priority,
126142
$delay / 1000,
127143
$this->ttr
128144
);
@@ -173,7 +189,11 @@ public function ack(string $id): void
173189
public function reject(string $id): void
174190
{
175191
try {
176-
$this->client->useTube($this->tube)->delete(new JobId((int) $id));
192+
if ($this->buryOnReject) {
193+
$this->client->useTube($this->tube)->bury(new JobId((int) $id), $this->priority);
194+
} else {
195+
$this->client->useTube($this->tube)->delete(new JobId((int) $id));
196+
}
177197
} catch (Exception $exception) {
178198
throw new TransportException($exception->getMessage(), 0, $exception);
179199
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.