Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

[Messenger] Use newer version of Beanstalkd bridge library #60040

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion 2 composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@
"monolog/monolog": "^3.0",
"nikic/php-parser": "^4.18|^5.0",
"nyholm/psr7": "^1.0",
"pda/pheanstalk": "^4.0",
"pda/pheanstalk": "^5.1|^7.0",
Copy link
Member Author

@HypeMC HypeMC Mar 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the versions that have the disconnect() method, which is needed for the reconnect mechanism. Also, v7 supports only PHP 8.3+, while v5 supports PHP 8.1+.

"php-http/discovery": "^1.15",
"php-http/httplug": "^1.0|^2.0",
"phpdocumentor/reflection-docblock": "^5.2",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,21 @@

namespace Symfony\Component\Messenger\Bridge\Beanstalkd\Tests\Transport;

use Pheanstalk\Contract\PheanstalkInterface;
use Pheanstalk\Contract\PheanstalkManagerInterface;
use Pheanstalk\Contract\PheanstalkPublisherInterface;
use Pheanstalk\Contract\PheanstalkSubscriberInterface;
use Pheanstalk\Exception;
use Pheanstalk\Exception\ClientException;
use Pheanstalk\Exception\DeadlineSoonException;
use Pheanstalk\Exception\ServerException;
use Pheanstalk\Job;
use Pheanstalk\JobId;
use Pheanstalk\Pheanstalk;
use Pheanstalk\Response\ArrayResponse;
use Pheanstalk\Values\Job;
use Pheanstalk\Values\JobId;
use Pheanstalk\Values\JobState;
use Pheanstalk\Values\JobStats;
use Pheanstalk\Values\TubeList;
use Pheanstalk\Values\TubeName;
use Pheanstalk\Values\TubeStats;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\Connection;
use Symfony\Component\Messenger\Exception\InvalidArgumentException as MessengerInvalidArgumentException;
Expand Down Expand Up @@ -124,7 +130,7 @@ public function testItThrowsAnExceptionIfAnExtraOptionIsDefinedInDSN()

public function testGet()
{
$id = 1234;
$id = '1234';
$beanstalkdEnvelope = [
'body' => 'foo',
'headers' => 'bar',
Expand All @@ -133,17 +139,20 @@ public function testGet()
$tube = 'baz';
$timeout = 44;

$job = new Job($id, json_encode($beanstalkdEnvelope));
$tubeList = new TubeList($tubeName = new TubeName($tube), $tubeNameDefault = new TubeName('default'));
$job = new Job(new JobId($id), json_encode($beanstalkdEnvelope));

$client = $this->createMock(PheanstalkInterface::class);
$client->expects($this->once())->method('watchOnly')->with($tube)->willReturn($client);
$client->expects($this->once())->method('watch')->with($tubeName)->willReturn(2);
$client->expects($this->once())->method('listTubesWatched')->willReturn($tubeList);
$client->expects($this->once())->method('ignore')->with($tubeNameDefault)->willReturn(1);
$client->expects($this->once())->method('reserveWithTimeout')->with($timeout)->willReturn($job);

$connection = new Connection(['tube_name' => $tube, 'timeout' => $timeout], $client);

$envelope = $connection->get();

$this->assertSame((string) $id, $envelope['id']);
$this->assertSame($id, $envelope['id']);
$this->assertSame($beanstalkdEnvelope['body'], $envelope['body']);
$this->assertSame($beanstalkdEnvelope['headers'], $envelope['headers']);
}
Expand All @@ -154,7 +163,9 @@ public function testGetWhenThereIsNoJobInTheTube()
$timeout = 44;

$client = $this->createMock(PheanstalkInterface::class);
$client->expects($this->once())->method('watchOnly')->with($tube)->willReturn($client);
$client->expects($this->once())->method('watch')->with(new TubeName($tube))->willReturn(1);
$client->expects($this->never())->method('listTubesWatched');
$client->expects($this->never())->method('ignore');
$client->expects($this->once())->method('reserveWithTimeout')->with($timeout)->willReturn(null);

$connection = new Connection(['tube_name' => $tube, 'timeout' => $timeout], $client);
Expand All @@ -170,7 +181,9 @@ public function testGetWhenABeanstalkdExceptionOccurs()
$exception = new DeadlineSoonException('foo error');

$client = $this->createMock(PheanstalkInterface::class);
$client->expects($this->once())->method('watchOnly')->with($tube)->willReturn($client);
$client->expects($this->once())->method('watch')->with(new TubeName($tube))->willReturn(1);
$client->expects($this->never())->method('listTubesWatched');
$client->expects($this->never())->method('ignore');
$client->expects($this->once())->method('reserveWithTimeout')->with($timeout)->willThrowException($exception);

$connection = new Connection(['tube_name' => $tube, 'timeout' => $timeout], $client);
Expand All @@ -181,35 +194,35 @@ public function testGetWhenABeanstalkdExceptionOccurs()

public function testAck()
{
$id = 123456;
$id = '123456';

$tube = 'xyz';

$client = $this->createMock(PheanstalkInterface::class);
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
$client->expects($this->once())->method('useTube')->with(new TubeName($tube));
$client->expects($this->once())->method('delete')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id));

$connection = new Connection(['tube_name' => $tube], $client);

$connection->ack((string) $id);
$connection->ack($id);
}

public function testAckWhenABeanstalkdExceptionOccurs()
{
$id = 123456;
$id = '123456';

$tube = 'xyzw';

$exception = new ServerException('baz error');

$client = $this->createMock(PheanstalkInterface::class);
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
$client->expects($this->once())->method('useTube')->with(new TubeName($tube));
$client->expects($this->once())->method('delete')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id))->willThrowException($exception);

$connection = new Connection(['tube_name' => $tube], $client);

$this->expectExceptionObject(new TransportException($exception->getMessage(), 0, $exception));
$connection->ack((string) $id);
$connection->ack($id);
}

/**
Expand All @@ -219,66 +232,66 @@ public function testAckWhenABeanstalkdExceptionOccurs()
*/
public function testReject(bool $buryOnReject, bool $forceDelete)
{
$id = 123456;
$id = '123456';

$tube = 'baz';

$client = $this->createMock(PheanstalkInterface::class);
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
$client->expects($this->once())->method('useTube')->with(new TubeName($tube));
$client->expects($this->once())->method('delete')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id));

$connection = new Connection(['tube_name' => $tube, 'bury_on_reject' => $buryOnReject], $client);

$connection->reject((string) $id, null, $forceDelete);
$connection->reject($id, null, $forceDelete);
}

public function testRejectWithBury()
{
$id = 123456;
$id = '123456';

$tube = 'baz';

$client = $this->createMock(PheanstalkInterface::class);
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
$client->expects($this->once())->method('useTube')->with(new TubeName($tube));
$client->expects($this->once())->method('bury')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id), 1024);

$connection = new Connection(['tube_name' => $tube, 'bury_on_reject' => true], $client);

$connection->reject((string) $id);
$connection->reject($id);
}

public function testRejectWithBuryAndPriority()
{
$id = 123456;
$id = '123456';
$priority = 2;

$tube = 'baz';

$client = $this->createMock(PheanstalkInterface::class);
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
$client->expects($this->once())->method('useTube')->with(new TubeName($tube));
$client->expects($this->once())->method('bury')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id), $priority);

$connection = new Connection(['tube_name' => $tube, 'bury_on_reject' => true], $client);

$connection->reject((string) $id, $priority);
$connection->reject($id, $priority);
}

public function testRejectWhenABeanstalkdExceptionOccurs()
{
$id = 123456;
$id = '123456';

$tube = 'baz123';

$exception = new ServerException('baz error');

$client = $this->createMock(PheanstalkInterface::class);
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
$client->expects($this->once())->method('useTube')->with(new TubeName($tube));
$client->expects($this->once())->method('delete')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id))->willThrowException($exception);

$connection = new Connection(['tube_name' => $tube], $client);

$this->expectExceptionObject(new TransportException($exception->getMessage(), 0, $exception));
$connection->reject((string) $id);
$connection->reject($id);
}

public function testMessageCount()
Expand All @@ -287,10 +300,11 @@ public function testMessageCount()

$count = 51;

$response = new ArrayResponse('OK', ['current-jobs-ready' => $count]);
$response = new TubeStats($tubeName = new TubeName($tube), 0, 51, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);

$client = $this->createMock(PheanstalkInterface::class);
$client->expects($this->once())->method('statsTube')->with($tube)->willReturn($response);
$client->expects($this->once())->method('useTube')->with($tubeName);
$client->expects($this->once())->method('statsTube')->with($tubeName)->willReturn($response);

$connection = new Connection(['tube_name' => $tube], $client);

Expand All @@ -304,7 +318,7 @@ public function testMessageCountWhenABeanstalkdExceptionOccurs()
$exception = new ClientException('foobar error');

$client = $this->createMock(PheanstalkInterface::class);
$client->expects($this->once())->method('statsTube')->with($tube)->willThrowException($exception);
$client->expects($this->once())->method('statsTube')->with(new TubeName($tube))->willThrowException($exception);

$connection = new Connection(['tube_name' => $tube], $client);

Expand All @@ -314,24 +328,24 @@ public function testMessageCountWhenABeanstalkdExceptionOccurs()

public function testMessagePriority()
{
$id = 123456;
$id = '123456';
$priority = 51;

$tube = 'baz';

$response = new ArrayResponse('OK', ['pri' => $priority]);
$response = new JobStats(new JobId($id), new TubeName($tube), JobState::READY, $priority, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);

$client = $this->createMock(PheanstalkInterface::class);
$client->expects($this->once())->method('statsJob')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id))->willReturn($response);

$connection = new Connection(['tube_name' => $tube], $client);

$this->assertSame($priority, $connection->getMessagePriority((string) $id));
$this->assertSame($priority, $connection->getMessagePriority($id));
}

public function testMessagePriorityWhenABeanstalkdExceptionOccurs()
{
$id = 123456;
$id = '123456';

$tube = 'baz1234';

Expand All @@ -343,7 +357,7 @@ public function testMessagePriorityWhenABeanstalkdExceptionOccurs()
$connection = new Connection(['tube_name' => $tube], $client);

$this->expectExceptionObject(new TransportException($exception->getMessage(), 0, $exception));
$connection->getMessagePriority((string) $id);
$connection->getMessagePriority($id);
}

public function testSend()
Expand All @@ -355,10 +369,10 @@ public function testSend()
$delay = 1000;
$expectedDelay = $delay / 1000;

$id = 110;
$id = '110';

$client = $this->createMock(PheanstalkInterface::class);
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
$client->expects($this->once())->method('useTube')->with(new TubeName($tube));
$client->expects($this->once())->method('put')->with(
$this->callback(function (string $data) use ($body, $headers): bool {
$expectedMessage = json_encode([
Expand All @@ -371,13 +385,13 @@ public function testSend()
1024,
$expectedDelay,
90
)->willReturn(new Job($id, 'foobar'));
)->willReturn(new Job(new JobId($id), 'foobar'));

$connection = new Connection(['tube_name' => $tube], $client);

$returnedId = $connection->send($body, $headers, $delay);

$this->assertSame($id, (int) $returnedId);
$this->assertSame($id, $returnedId);
}

public function testSendWithPriority()
Expand All @@ -390,10 +404,10 @@ public function testSendWithPriority()
$priority = 2;
$expectedDelay = $delay / 1000;

$id = 110;
$id = '110';

$client = $this->createMock(PheanstalkInterface::class);
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
$client->expects($this->once())->method('useTube')->with(new TubeName($tube));
$client->expects($this->once())->method('put')->with(
$this->callback(function (string $data) use ($body, $headers): bool {
$expectedMessage = json_encode([
Expand All @@ -406,13 +420,13 @@ public function testSendWithPriority()
$priority,
$expectedDelay,
90
)->willReturn(new Job($id, 'foobar'));
)->willReturn(new Job(new JobId($id), 'foobar'));

$connection = new Connection(['tube_name' => $tube], $client);

$returnedId = $connection->send($body, $headers, $delay, $priority);

$this->assertSame($id, (int) $returnedId);
$this->assertSame($id, $returnedId);
}

public function testSendWhenABeanstalkdExceptionOccurs()
Expand All @@ -427,7 +441,7 @@ public function testSendWhenABeanstalkdExceptionOccurs()
$exception = new Exception('foo bar');

$client = $this->createMock(PheanstalkInterface::class);
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
$client->expects($this->once())->method('useTube')->with(new TubeName($tube));
$client->expects($this->once())->method('put')->with(
$this->callback(function (string $data) use ($body, $headers): bool {
$expectedMessage = json_encode([
Expand All @@ -451,35 +465,35 @@ public function testSendWhenABeanstalkdExceptionOccurs()

public function testKeepalive()
{
$id = 123456;
$id = '123456';

$tube = 'baz';

$client = $this->createMock(PheanstalkInterface::class);
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
$client->expects($this->once())->method('useTube')->with(new TubeName($tube));
$client->expects($this->once())->method('touch')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id));

$connection = new Connection(['tube_name' => $tube], $client);

$connection->keepalive((string) $id);
$connection->keepalive($id);
}

public function testKeepaliveWhenABeanstalkdExceptionOccurs()
{
$id = 123456;
$id = '123456';

$tube = 'baz123';

$exception = new ServerException('baz error');

$client = $this->createMock(PheanstalkInterface::class);
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
$client->expects($this->once())->method('useTube')->with(new TubeName($tube));
$client->expects($this->once())->method('touch')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id))->willThrowException($exception);

$connection = new Connection(['tube_name' => $tube], $client);

$this->expectExceptionObject(new TransportException($exception->getMessage(), 0, $exception));
$connection->keepalive((string) $id);
$connection->keepalive($id);
}

public function testSendWithRoundedDelay()
Expand All @@ -491,7 +505,7 @@ public function testSendWithRoundedDelay()
$expectedDelay = 0;

$client = $this->createMock(PheanstalkInterface::class);
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
$client->expects($this->once())->method('useTube')->with(new TubeName($tube));
$client->expects($this->once())->method('put')->with(
$this->anything(),
$this->anything(),
Expand All @@ -503,3 +517,7 @@ public function testSendWithRoundedDelay()
$connection->send($body, $headers, $delay);
}
}

interface PheanstalkInterface extends PheanstalkPublisherInterface, PheanstalkSubscriberInterface, PheanstalkManagerInterface
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PheanstalkInterface was split into multiple interfaces. Since the client implements all of them, it's hard to mock. This is a helper interface to make mocking easier.

{
}
Loading
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.