Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

[Messenger] Add bury_on_reject option to Beanstalkd bridge #49652

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ CHANGELOG
---

* Add `BeanstalkdPriorityStamp` option to allow setting the message priority
* Add `bury_on_reject` option to bury failed messages instead of deleting them

7.2
---
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@

use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Bridge\Beanstalkd\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdPriorityStamp;
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceivedStamp;
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceiver;
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\Connection;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Stamp\SentForRetryStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
Expand Down Expand Up @@ -81,12 +83,42 @@ public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException()
$beanstalkdEnvelope = $this->createBeanstalkdEnvelope();
$connection = $this->createMock(Connection::class);
$connection->expects($this->once())->method('get')->willReturn($beanstalkdEnvelope);
$connection->expects($this->once())->method('reject');
$connection->expects($this->once())->method('getMessagePriority')->with($beanstalkdEnvelope['id'])->willReturn(2);
$connection->expects($this->once())->method('reject')->with($beanstalkdEnvelope['id'], 2);

$receiver = new BeanstalkdReceiver($connection, $serializer);
$receiver->get();
}

/**
* @dataProvider provideRejectCases
*/
public function testReject(array $stamps, ?int $priority, bool $forceDelete)
{
$serializer = $this->createSerializer();

$id = 'some id';

$connection = $this->createMock(Connection::class);
$connection->expects($this->once())->method('reject')->with($id, $priority, $forceDelete);

$envelope = (new Envelope(new DummyMessage('Oy')))->with(new BeanstalkdReceivedStamp($id, 'foo bar'));
foreach ($stamps as $stamp) {
$envelope = $envelope->with($stamp);
}

$receiver = new BeanstalkdReceiver($connection, $serializer);
$receiver->reject($envelope);
}

public static function provideRejectCases(): iterable
{
yield 'No stamp' => [[], null, false];
yield 'With sent for retry true' => [[new SentForRetryStamp(true)], null, true];
yield 'With sent for retry true and priority' => [[new BeanstalkdPriorityStamp(2), new SentForRetryStamp(true)], 2, true];
yield 'With sent for retry false' => [[new SentForRetryStamp(false)], null, false];
}

public function testKeepalive()
{
$serializer = $this->createSerializer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public function testFromDsn()
$this->assertSame('default', $configuration['tube_name']);
$this->assertSame(0, $configuration['timeout']);
$this->assertSame(90, $configuration['ttr']);
$this->assertFalse($configuration['bury_on_reject']);

$this->assertEquals(
$connection = new Connection([], Pheanstalk::create('foobar', 15555)),
Expand All @@ -58,22 +59,32 @@ public function testFromDsn()
$this->assertSame('default', $configuration['tube_name']);
$this->assertSame(0, $configuration['timeout']);
$this->assertSame(90, $configuration['ttr']);
$this->assertFalse($configuration['bury_on_reject']);
$this->assertSame('default', $connection->getTube());
}

public function testFromDsnWithOptions()
{
$this->assertEquals(
$connection = Connection::fromDsn('beanstalkd://localhost', ['tube_name' => 'foo', 'timeout' => 10, 'ttr' => 5000]),
Connection::fromDsn('beanstalkd://localhost?tube_name=foo&timeout=10&ttr=5000')
$connectionWithOptions = Connection::fromDsn('beanstalkd://localhost', ['tube_name' => 'foo', 'timeout' => 10, 'ttr' => 5000, 'bury_on_reject' => true]),
$connectionWithQuery = Connection::fromDsn('beanstalkd://localhost?tube_name=foo&timeout=10&ttr=5000&bury_on_reject=true')
);

$configuration = $connection->getConfiguration();
$configuration = $connectionWithOptions->getConfiguration();

$this->assertSame('foo', $configuration['tube_name']);
$this->assertSame(10, $configuration['timeout']);
$this->assertSame(5000, $configuration['ttr']);
$this->assertSame('foo', $connection->getTube());
$this->assertTrue($configuration['bury_on_reject']);
$this->assertSame('foo', $connectionWithOptions->getTube());

$configuration = $connectionWithQuery->getConfiguration();

$this->assertSame('foo', $configuration['tube_name']);
$this->assertSame(10, $configuration['timeout']);
$this->assertSame(5000, $configuration['ttr']);
$this->assertTrue($configuration['bury_on_reject']);
$this->assertSame('foo', $connectionWithOptions->getTube());
}

public function testFromDsnOptionsArrayWinsOverOptionsFromDsn()
Expand All @@ -82,18 +93,20 @@ public function testFromDsnOptionsArrayWinsOverOptionsFromDsn()
'tube_name' => 'bar',
'timeout' => 20,
'ttr' => 6000,
'bury_on_reject' => false,
];

$this->assertEquals(
$connection = new Connection($options, Pheanstalk::create('localhost', 11333)),
Connection::fromDsn('beanstalkd://localhost:11333?tube_name=foo&timeout=10&ttr=5000', $options)
Connection::fromDsn('beanstalkd://localhost:11333?tube_name=foo&timeout=10&ttr=5000&bury_on_reject=true', $options)
);

$configuration = $connection->getConfiguration();

$this->assertSame($options['tube_name'], $configuration['tube_name']);
$this->assertSame($options['timeout'], $configuration['timeout']);
$this->assertSame($options['ttr'], $configuration['ttr']);
$this->assertSame($options['bury_on_reject'], $configuration['bury_on_reject']);
$this->assertSame($options['tube_name'], $connection->getTube());
}

Expand Down Expand Up @@ -199,7 +212,12 @@ public function testAckWhenABeanstalkdExceptionOccurs()
$connection->ack((string) $id);
}

public function testReject()
/**
* @testWith [false, false]
* [false, true]
* [true, true]
*/
public function testReject(bool $buryOnReject, bool $forceDelete)
{
$id = 123456;

Expand All @@ -209,11 +227,42 @@ public function testReject()
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
$client->expects($this->once())->method('delete')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id));

$connection = new Connection(['tube_name' => $tube], $client);
$connection = new Connection(['tube_name' => $tube, 'bury_on_reject' => $buryOnReject], $client);

$connection->reject((string) $id, null, $forceDelete);
}

public function testRejectWithBury()
{
$id = 123456;

$tube = 'baz';

$client = $this->createMock(PheanstalkInterface::class);
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
$client->expects($this->once())->method('bury')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id), 1024);

$connection = new Connection(['tube_name' => $tube, 'bury_on_reject' => true], $client);

$connection->reject((string) $id);
}

public function testRejectWithBuryAndPriority()
{
$id = 123456;
$priority = 2;

$tube = 'baz';

$client = $this->createMock(PheanstalkInterface::class);
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
$client->expects($this->once())->method('bury')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id), $priority);

$connection = new Connection(['tube_name' => $tube, 'bury_on_reject' => true], $client);

$connection->reject((string) $id, $priority);
}

public function testRejectWhenABeanstalkdExceptionOccurs()
{
$id = 123456;
Expand Down Expand Up @@ -263,6 +312,40 @@ public function testMessageCountWhenABeanstalkdExceptionOccurs()
$connection->getMessageCount();
}

public function testMessagePriority()
{
$id = 123456;
$priority = 51;

$tube = 'baz';

$response = new ArrayResponse('OK', ['pri' => $priority]);

$client = $this->createMock(PheanstalkInterface::class);
$client->expects($this->once())->method('statsJob')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id))->willReturn($response);

$connection = new Connection(['tube_name' => $tube], $client);

$this->assertSame($priority, $connection->getMessagePriority((string) $id));
}

public function testMessagePriorityWhenABeanstalkdExceptionOccurs()
{
$id = 123456;

$tube = 'baz1234';

$exception = new ClientException('foobar error');

$client = $this->createMock(PheanstalkInterface::class);
$client->expects($this->once())->method('statsJob')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id))->willThrowException($exception);

$connection = new Connection(['tube_name' => $tube], $client);

$this->expectExceptionObject(new TransportException($exception->getMessage(), 0, $exception));
$connection->getMessagePriority((string) $id);
}

public function testSend()
{
$tube = 'xyz';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\LogicException;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Stamp\SentForRetryStamp;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
Expand Down Expand Up @@ -48,7 +49,10 @@ public function get(): iterable
'headers' => $beanstalkdEnvelope['headers'],
]);
} catch (MessageDecodingFailedException $exception) {
$this->connection->reject($beanstalkdEnvelope['id']);
$this->connection->reject(
$beanstalkdEnvelope['id'],
$this->connection->getMessagePriority($beanstalkdEnvelope['id']),
);

throw $exception;
}
Expand All @@ -68,7 +72,11 @@ public function ack(Envelope $envelope): void

public function reject(Envelope $envelope): void
{
$this->connection->reject($this->findBeanstalkdReceivedStamp($envelope)->getId());
$this->connection->reject(
$this->findBeanstalkdReceivedStamp($envelope)->getId(),
$envelope->last(BeanstalkdPriorityStamp::class)?->priority,
$envelope->last(SentForRetryStamp::class)?->isSent ?? false,
);
}

public function keepalive(Envelope $envelope, ?int $seconds = null): void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ class Connection
'tube_name' => PheanstalkInterface::DEFAULT_TUBE,
'timeout' => 0,
'ttr' => 90,
'bury_on_reject' => false,
];

private string $tube;
private int $timeout;
private int $ttr;
private bool $buryOnReject;

/**
* Constructor.
Expand All @@ -46,6 +48,7 @@ class Connection
* * tube_name: name of the tube
* * timeout: message reservation timeout (in seconds)
* * ttr: the message time to run before it is put back in the ready queue (in seconds)
* * bury_on_reject: bury rejected messages instead of deleting them
*/
public function __construct(
private array $configuration,
Expand All @@ -55,6 +58,7 @@ public function __construct(
$this->tube = $this->configuration['tube_name'];
$this->timeout = $this->configuration['timeout'];
$this->ttr = $this->configuration['ttr'];
$this->buryOnReject = $this->configuration['bury_on_reject'];
}

public static function fromDsn(#[\SensitiveParameter] string $dsn, array $options = []): self
Expand All @@ -74,7 +78,15 @@ public static function fromDsn(#[\SensitiveParameter] string $dsn, array $option
}

$configuration = [];
$configuration += $options + $query + self::DEFAULT_OPTIONS;
foreach (self::DEFAULT_OPTIONS as $k => $v) {
$value = $options[$k] ?? $query[$k] ?? $v;

$configuration[$k] = match (\gettype($v)) {
'integer' => filter_var($value, \FILTER_VALIDATE_INT),
'boolean' => filter_var($value, \FILTER_VALIDATE_BOOL),
default => $value,
};
}

// check for extra keys in options
$optionsExtraKeys = array_diff(array_keys($options), array_keys(self::DEFAULT_OPTIONS));
Expand Down Expand Up @@ -172,10 +184,14 @@ public function ack(string $id): void
}
}

public function reject(string $id): void
public function reject(string $id, ?int $priority = null, bool $forceDelete = false): void
{
try {
$this->client->useTube($this->tube)->delete(new JobId((int) $id));
if (!$forceDelete && $this->buryOnReject) {
$this->client->useTube($this->tube)->bury(new JobId((int) $id), $priority ?? PheanstalkInterface::DEFAULT_PRIORITY);
} else {
$this->client->useTube($this->tube)->delete(new JobId((int) $id));
}
} catch (Exception $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}
Expand All @@ -201,4 +217,15 @@ public function getMessageCount(): int

return (int) $tubeStats['current-jobs-ready'];
}

public function getMessagePriority(string $id): int
{
try {
$jobStats = $this->client->statsJob(new JobId((int) $id));
} catch (Exception $exception) {
throw new TransportException($exception->getMessage(), 0, $exception);
}

return (int) $jobStats['pri'];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"require": {
"php": ">=8.2",
"pda/pheanstalk": "^4.0",
"symfony/messenger": "^7.2"
"symfony/messenger": "^7.3"
},
"require-dev": {
"symfony/property-access": "^6.4|^7.0",
Expand Down
5 changes: 5 additions & 0 deletions 5 src/Symfony/Component/Messenger/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
CHANGELOG
=========

7.3
---

* Add `SentForRetryStamp` that identifies whether a failed message was sent for retry

7.2
---

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
use Symfony\Component\Messenger\Retry\RetryStrategyInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
use Symfony\Component\Messenger\Stamp\SentForRetryStamp;
use Symfony\Component\Messenger\Stamp\StampInterface;
use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
Expand Down Expand Up @@ -82,6 +83,8 @@ public function onMessageFailed(WorkerMessageFailedEvent $event): void
} else {
$this->logger?->critical('Error thrown while handling message {class}. Removing from transport after {retryCount} retries. Error: "{error}"', $context + ['retryCount' => $retryCount, 'error' => $throwable->getMessage(), 'exception' => $throwable]);
}

$event->addStamps(new SentForRetryStamp($shouldRetry));
}

/**
Expand Down
Loading
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.