Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 7987d87

Browse filesBrowse files
committed
[Messenger] Add BeanstalkdPriorityStamp to Beanstalkd bridge
1 parent 8c73cb3 commit 7987d87
Copy full SHA for 7987d87

File tree

8 files changed

+163
-22
lines changed
Filter options

8 files changed

+163
-22
lines changed

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Beanstalkd/CHANGELOG.md
+2-1
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
CHANGELOG
22
=========
33

4-
6.4
4+
7.1
55
---
66

77
* Add `bury_on_reject` option to bury failed messages instead of deleting them
8+
* Add `BeanstalkdPriorityStamp` option to allow setting the message priority
89

910
5.2.0
1011
-----

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdReceiverTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdReceiverTest.php
+9-6
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
use PHPUnit\Framework\TestCase;
1515
use Symfony\Component\Messenger\Bridge\Beanstalkd\Tests\Fixtures\DummyMessage;
16+
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdPriorityStamp;
1617
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceivedStamp;
1718
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceiver;
1819
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\Connection;
@@ -74,7 +75,8 @@ public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException()
7475
$beanstalkdEnvelope = $this->createBeanstalkdEnvelope();
7576
$connection = $this->createMock(Connection::class);
7677
$connection->expects($this->once())->method('get')->willReturn($beanstalkdEnvelope);
77-
$connection->expects($this->once())->method('reject');
78+
$connection->expects($this->once())->method('getMessagePriority')->with($beanstalkdEnvelope['id'])->willReturn(2);
79+
$connection->expects($this->once())->method('reject')->with($beanstalkdEnvelope['id'], 2);
7880

7981
$receiver = new BeanstalkdReceiver($connection, $serializer);
8082
$receiver->get();
@@ -83,14 +85,14 @@ public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException()
8385
/**
8486
* @dataProvider provideRejectCases
8587
*/
86-
public function testReject(array $stamps, bool $forceDelete)
88+
public function testReject(array $stamps, ?int $priority, bool $forceDelete)
8789
{
8890
$serializer = $this->createSerializer();
8991

9092
$id = 'some id';
9193

9294
$connection = $this->createMock(Connection::class);
93-
$connection->expects($this->once())->method('reject')->with($id, $forceDelete);
95+
$connection->expects($this->once())->method('reject')->with($id, $priority, $forceDelete);
9496

9597
$envelope = (new Envelope(new DummyMessage('Oy')))->with(new BeanstalkdReceivedStamp($id, 'foo bar'));
9698
foreach ($stamps as $stamp) {
@@ -103,9 +105,10 @@ public function testReject(array $stamps, bool $forceDelete)
103105

104106
public static function provideRejectCases(): iterable
105107
{
106-
yield 'No stamp' => [[], false];
107-
yield 'With sent for retry true' => [[new SentForRetryStamp(true)], true];
108-
yield 'With sent for retry false' => [[new SentForRetryStamp(false)], false];
108+
yield 'No stamp' => [[], null, false];
109+
yield 'With sent for retry true' => [[new SentForRetryStamp(true)], null, true];
110+
yield 'With sent for retry true and priority' => [[new BeanstalkdPriorityStamp(2), new SentForRetryStamp(true)], 2, true];
111+
yield 'With sent for retry false' => [[new SentForRetryStamp(false)], null, false];
109112
}
110113

111114
private function createBeanstalkdEnvelope(): array

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdSenderTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdSenderTest.php
+18-2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
use PHPUnit\Framework\TestCase;
1515
use Symfony\Component\Messenger\Bridge\Beanstalkd\Tests\Fixtures\DummyMessage;
16+
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdPriorityStamp;
1617
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdSender;
1718
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\Connection;
1819
use Symfony\Component\Messenger\Envelope;
@@ -27,7 +28,7 @@ public function testSend()
2728
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
2829

2930
$connection = $this->createMock(Connection::class);
30-
$connection->expects($this->once())->method('send')->with($encoded['body'], $encoded['headers'], 0);
31+
$connection->expects($this->once())->method('send')->with($encoded['body'], $encoded['headers'], 0, null);
3132

3233
$serializer = $this->createMock(SerializerInterface::class);
3334
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
@@ -42,7 +43,22 @@ public function testSendWithDelay()
4243
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
4344

4445
$connection = $this->createMock(Connection::class);
45-
$connection->expects($this->once())->method('send')->with($encoded['body'], $encoded['headers'], 500);
46+
$connection->expects($this->once())->method('send')->with($encoded['body'], $encoded['headers'], 500, null);
47+
48+
$serializer = $this->createMock(SerializerInterface::class);
49+
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
50+
51+
$sender = new BeanstalkdSender($connection, $serializer);
52+
$sender->send($envelope);
53+
}
54+
55+
public function testSendWithPriority()
56+
{
57+
$envelope = (new Envelope(new DummyMessage('Oy')))->with(new BeanstalkdPriorityStamp(2));
58+
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
59+
60+
$connection = $this->createMock(Connection::class);
61+
$connection->expects($this->once())->method('send')->with($encoded['body'], $encoded['headers'], 0, 2);
4662

4763
$serializer = $this->createMock(SerializerInterface::class);
4864
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/ConnectionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/ConnectionTest.php
+87-2
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ public function testReject(bool $buryOnReject, bool $forceDelete)
229229

230230
$connection = new Connection(['tube_name' => $tube, 'bury_on_reject' => $buryOnReject], $client);
231231

232-
$connection->reject((string) $id, $forceDelete);
232+
$connection->reject((string) $id, null, $forceDelete);
233233
}
234234

235235
public function testRejectWithBury()
@@ -240,13 +240,29 @@ public function testRejectWithBury()
240240

241241
$client = $this->createMock(PheanstalkInterface::class);
242242
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
243-
$client->expects($this->once())->method('bury')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id));
243+
$client->expects($this->once())->method('bury')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id), 1024);
244244

245245
$connection = new Connection(['tube_name' => $tube, 'bury_on_reject' => true], $client);
246246

247247
$connection->reject((string) $id);
248248
}
249249

250+
public function testRejectWithBuryAndPriority()
251+
{
252+
$id = 123456;
253+
$priority = 2;
254+
255+
$tube = 'baz';
256+
257+
$client = $this->createMock(PheanstalkInterface::class);
258+
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
259+
$client->expects($this->once())->method('bury')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id), $priority);
260+
261+
$connection = new Connection(['tube_name' => $tube, 'bury_on_reject' => true], $client);
262+
263+
$connection->reject((string) $id, $priority);
264+
}
265+
250266
public function testRejectWhenABeanstalkdExceptionOccurs()
251267
{
252268
$id = 123456;
@@ -296,6 +312,40 @@ public function testMessageCountWhenABeanstalkdExceptionOccurs()
296312
$connection->getMessageCount();
297313
}
298314

315+
public function testMessagePriority()
316+
{
317+
$id = 123456;
318+
$priority = 51;
319+
320+
$tube = 'baz';
321+
322+
$response = new ArrayResponse('OK', ['pri' => $priority]);
323+
324+
$client = $this->createMock(PheanstalkInterface::class);
325+
$client->expects($this->once())->method('statsJob')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id))->willReturn($response);
326+
327+
$connection = new Connection(['tube_name' => $tube], $client);
328+
329+
$this->assertSame($priority, $connection->getMessagePriority((string) $id));
330+
}
331+
332+
public function testMessagePriorityWhenABeanstalkdExceptionOccurs()
333+
{
334+
$id = 123456;
335+
336+
$tube = 'baz1234';
337+
338+
$exception = new ClientException('foobar error');
339+
340+
$client = $this->createMock(PheanstalkInterface::class);
341+
$client->expects($this->once())->method('statsJob')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id))->willThrowException($exception);
342+
343+
$connection = new Connection(['tube_name' => $tube], $client);
344+
345+
$this->expectExceptionObject(new TransportException($exception->getMessage(), 0, $exception));
346+
$connection->getMessagePriority((string) $id);
347+
}
348+
299349
public function testSend()
300350
{
301351
$tube = 'xyz';
@@ -330,6 +380,41 @@ public function testSend()
330380
$this->assertSame($id, (int) $returnedId);
331381
}
332382

383+
public function testSendWithPriority()
384+
{
385+
$tube = 'xyz';
386+
387+
$body = 'foo';
388+
$headers = ['test' => 'bar'];
389+
$delay = 1000;
390+
$priority = 2;
391+
$expectedDelay = $delay / 1000;
392+
393+
$id = 110;
394+
395+
$client = $this->createMock(PheanstalkInterface::class);
396+
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
397+
$client->expects($this->once())->method('put')->with(
398+
$this->callback(function (string $data) use ($body, $headers): bool {
399+
$expectedMessage = json_encode([
400+
'body' => $body,
401+
'headers' => $headers,
402+
]);
403+
404+
return $expectedMessage === $data;
405+
}),
406+
$priority,
407+
$expectedDelay,
408+
90
409+
)->willReturn(new Job($id, 'foobar'));
410+
411+
$connection = new Connection(['tube_name' => $tube], $client);
412+
413+
$returnedId = $connection->send($body, $headers, $delay, $priority);
414+
415+
$this->assertSame($id, (int) $returnedId);
416+
}
417+
333418
public function testSendWhenABeanstalkdExceptionOccurs()
334419
{
335420
$tube = 'xyz';
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Bridge\Beanstalkd\Transport;
13+
14+
use Symfony\Component\Messenger\Stamp\StampInterface;
15+
16+
final class BeanstalkdPriorityStamp implements StampInterface
17+
{
18+
public function __construct(
19+
public readonly int $priority,
20+
) {
21+
}
22+
}

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdReceiver.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdReceiver.php
+2-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public function get(): iterable
4848
'headers' => $beanstalkdEnvelope['headers'],
4949
]);
5050
} catch (MessageDecodingFailedException $exception) {
51-
$this->connection->reject($beanstalkdEnvelope['id']);
51+
$this->connection->reject($beanstalkdEnvelope['id'], $this->connection->getMessagePriority($beanstalkdEnvelope['id']));
5252

5353
throw $exception;
5454
}
@@ -65,6 +65,7 @@ public function reject(Envelope $envelope): void
6565
{
6666
$this->connection->reject(
6767
$this->findBeanstalkdReceivedStamp($envelope)->getId(),
68+
$envelope->last(BeanstalkdPriorityStamp::class)?->priority,
6869
$envelope->last(SentForRetryStamp::class)?->isSent ?? false,
6970
);
7071
}

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdSender.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdSender.php
+6-5
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,12 @@ public function send(Envelope $envelope): Envelope
3535
{
3636
$encodedMessage = $this->serializer->encode($envelope);
3737

38-
/** @var DelayStamp|null $delayStamp */
39-
$delayStamp = $envelope->last(DelayStamp::class);
40-
$delayInMs = null !== $delayStamp ? $delayStamp->getDelay() : 0;
41-
42-
$this->connection->send($encodedMessage['body'], $encodedMessage['headers'] ?? [], $delayInMs);
38+
$this->connection->send(
39+
$encodedMessage['body'],
40+
$encodedMessage['headers'] ?? [],
41+
$envelope->last(DelayStamp::class)?->getDelay() ?? 0,
42+
$envelope->last(BeanstalkdPriorityStamp::class)?->priority,
43+
);
4344

4445
return $envelope;
4546
}

‎src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php
+17-5
Original file line numberDiff line numberDiff line change
@@ -117,11 +117,12 @@ public function getTube(): string
117117
}
118118

119119
/**
120-
* @param int $delay The delay in milliseconds
120+
* @param int $delay The delay in milliseconds
121+
* @param ?int $priority The priority by which the message will be reserved
121122
*
122123
* @return string The inserted id
123124
*/
124-
public function send(string $body, array $headers, int $delay = 0): string
125+
public function send(string $body, array $headers, int $delay = 0, int $priority = null): string
125126
{
126127
$message = json_encode([
127128
'body' => $body,
@@ -135,7 +136,7 @@ public function send(string $body, array $headers, int $delay = 0): string
135136
try {
136137
$job = $this->client->useTube($this->tube)->put(
137138
$message,
138-
PheanstalkInterface::DEFAULT_PRIORITY,
139+
$priority ?? PheanstalkInterface::DEFAULT_PRIORITY,
139140
$delay / 1000,
140141
$this->ttr
141142
);
@@ -183,11 +184,11 @@ public function ack(string $id): void
183184
}
184185
}
185186

186-
public function reject(string $id, bool $forceDelete = false): void
187+
public function reject(string $id, int $priority = null, bool $forceDelete = false): void
187188
{
188189
try {
189190
if (!$forceDelete && $this->buryOnReject) {
190-
$this->client->useTube($this->tube)->bury(new JobId((int) $id));
191+
$this->client->useTube($this->tube)->bury(new JobId((int) $id), $priority ?? PheanstalkInterface::DEFAULT_PRIORITY);
191192
} else {
192193
$this->client->useTube($this->tube)->delete(new JobId((int) $id));
193194
}
@@ -207,4 +208,15 @@ public function getMessageCount(): int
207208

208209
return (int) $tubeStats['current-jobs-ready'];
209210
}
211+
212+
public function getMessagePriority(string $id): int
213+
{
214+
try {
215+
$jobStats = $this->client->statsJob(new JobId((int) $id));
216+
} catch (Exception $exception) {
217+
throw new TransportException($exception->getMessage(), 0, $exception);
218+
}
219+
220+
return (int) $jobStats['pri'];
221+
}
210222
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.