Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit e0d5932

Browse filesBrowse files
Refractor using redis streams
1 parent 828f018 commit e0d5932
Copy full SHA for e0d5932

15 files changed

+257
-341
lines changed

‎.travis.yml

Copy file name to clipboardExpand all lines: .travis.yml
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ env:
1919
- MIN_PHP=7.1.3
2020
- SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/shims/php
2121
- MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages
22+
- MESSENGER_REDIS_DSN=redis://localhost/messages
2223

2324
matrix:
2425
include:

‎src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php
+13-14Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,36 +19,35 @@
1919
*/
2020
class ConnectionTest extends TestCase
2121
{
22-
/**
23-
* @expectedException \InvalidArgumentException
24-
* @expectedExceptionMessage The given Redis DSN "redis://" is invalid.
25-
*/
26-
public function testItCannotBeConstructedWithAWrongDsn()
22+
public function testFromInvalidDns()
2723
{
24+
$this->expectException(\InvalidArgumentException::class);
25+
$this->expectExceptionMessage('The given Redis DSN "redis://" is invalid');
26+
2827
Connection::fromDsn('redis://');
2928
}
3029

31-
public function testItGetsParametersFromTheDsn()
30+
public function testFromDns()
3231
{
3332
$this->assertEquals(
34-
new Connection('queue', array(
33+
new Connection(['stream' => 'queue'], array(
3534
'host' => 'localhost',
3635
'port' => 6379,
3736
)),
3837
Connection::fromDsn('redis://localhost/queue')
3938
);
4039
}
4140

42-
public function testOverrideOptionsViaQueryParameters()
41+
public function testFromDnsWithOptionals()
4342
{
4443
$this->assertEquals(
45-
new Connection('queue', array(
46-
'host' => '127.0.0.1',
44+
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1'], [
45+
'host' => 'localhost',
4746
'port' => 6379,
48-
), array(
49-
'processing_ttl' => '8000',
50-
)),
51-
Connection::fromDsn('redis://127.0.0.1:6379/queue?processing_ttl=8000')
47+
], [
48+
'blocking_timeout' => 30,
49+
]),
50+
Connection::fromDsn('redis://localhost/queue/group1/consumer1?blocking_timeout=30')
5251
);
5352
}
5453
}

‎src/Symfony/Component/Messenger/Tests/Transport/RedisExt/Fixtures/long_receiver.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/RedisExt/Fixtures/long_receiver.php
-43Lines changed: 0 additions & 43 deletions
This file was deleted.

‎src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.php
+6-6Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ protected function setUp()
4141
public function testItSendsAndReceivesMessages()
4242
{
4343
$serializer = new Serializer(
44-
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
44+
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
4545
);
4646

4747
$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'));
@@ -65,7 +65,7 @@ public function testItSendsAndReceivesMessages()
6565
public function testItReceivesSignals()
6666
{
6767
$serializer = new Serializer(
68-
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
68+
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
6969
);
7070

7171
$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'));
@@ -75,10 +75,10 @@ public function testItReceivesSignals()
7575

7676
$amqpReadTimeout = 30;
7777
$dsn = getenv('MESSENGER_REDIS_DSN').'?read_timeout='.$amqpReadTimeout;
78-
$process = new PhpProcess(file_get_contents(__DIR__.'/Fixtures/long_receiver.php'), null, array(
78+
$process = new PhpProcess(file_get_contents(__DIR__.'/Fixtures/long_receiver.php'), null, [
7979
'COMPONENT_ROOT' => __DIR__.'/../../../',
8080
'DSN' => $dsn,
81-
));
81+
]);
8282

8383
$process->start();
8484

@@ -112,10 +112,10 @@ public function testItReceivesSignals()
112112
public function testItSupportsTimeoutAndTicksNullMessagesToTheHandler()
113113
{
114114
$serializer = new Serializer(
115-
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
115+
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
116116
);
117117

118-
$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), array('blocking_timeout' => '1'));
118+
$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), ['blocking_timeout' => '1']);
119119

120120
$receiver = new RedisReceiver($connection, $serializer);
121121

‎src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisReceiverTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisReceiverTest.php
+29-71Lines changed: 29 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -12,107 +12,65 @@
1212
namespace Symfony\Component\Messenger\Tests\Transport\RedisExt;
1313

1414
use PHPUnit\Framework\TestCase;
15-
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
1616
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
1717
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
18-
use Symfony\Component\Messenger\Transport\RedisExt\Exception\RejectMessageExceptionInterface;
1918
use Symfony\Component\Messenger\Transport\RedisExt\RedisReceiver;
19+
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
2020
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
2121
use Symfony\Component\Serializer as SerializerComponent;
2222
use Symfony\Component\Serializer\Encoder\JsonEncoder;
2323
use Symfony\Component\Serializer\Normalizer\ObjectNormalizer;
2424

25-
/**
26-
* @requires extension redis
27-
*/
2825
class RedisReceiverTest extends TestCase
2926
{
30-
public function testItSendTheDecodedMessageToTheHandlerAndAcknowledgeIt()
27+
public function testItReturnsTheDecodedMessageToTheHandler()
3128
{
32-
$serializer = new Serializer(
33-
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
34-
);
35-
36-
$envelope = Envelope::wrap(new DummyMessage('Hi'));
37-
$encoded = $serializer->encode($envelope);
29+
$serializer = $this->createSerializer();
3830

31+
$redisEnvelop = $this->createRedisEnvelope();
3932
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
40-
$connection->method('waitAndGet')->willReturn($encoded);
41-
42-
$connection->expects($this->once())->method('ack')->with($encoded);
33+
$connection->method('get')->willReturn($redisEnvelop);
4334

4435
$receiver = new RedisReceiver($connection, $serializer);
45-
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
46-
$this->assertEquals(new DummyMessage('Hi'), $envelope->getMessage());
47-
$receiver->stop();
48-
});
36+
$actualEnvelopes = iterator_to_array($receiver->get());
37+
$this->assertCount(1, $actualEnvelopes);
38+
$this->assertEquals(new DummyMessage('Hi'), $actualEnvelopes[0]->getMessage());
4939
}
5040

51-
public function testItSendNoMessageToTheHandler()
41+
public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException()
5242
{
53-
$serializer = new Serializer(
54-
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
55-
);
43+
$this->expectException(MessageDecodingFailedException::class);
5644

45+
$serializer = $this->createMock(PhpSerializer::class);
46+
$serializer->method('decode')->willThrowException(new MessageDecodingFailedException());
47+
48+
$redisEnvelop = $this->createRedisEnvelope();
5749
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
58-
$connection->method('waitAndGet')->willReturn(null);
50+
$connection->method('get')->willReturn($redisEnvelop);
51+
$connection->expects($this->once())->method('reject');
5952

6053
$receiver = new RedisReceiver($connection, $serializer);
61-
$receiver->receive(function (?Envelope $envelope) use ($receiver) {
62-
$this->assertNull($envelope);
63-
$receiver->stop();
64-
});
54+
iterator_to_array($receiver->get());
6555
}
6656

67-
/**
68-
* @expectedException \Symfony\Component\Messenger\Tests\Transport\RedisExt\InterruptException
69-
*/
70-
public function testItNonAcknowledgeTheMessageIfAnExceptionHappened()
57+
private function createRedisEnvelope()
7158
{
72-
$serializer = new Serializer(
73-
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
74-
);
75-
76-
$envelope = Envelope::wrap(new DummyMessage('Hi'));
77-
$encoded = $serializer->encode($envelope);
78-
79-
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
80-
$connection->method('waitAndGet')->willReturn($encoded);
81-
$connection->expects($this->once())->method('requeue')->with($encoded);
82-
83-
$receiver = new RedisReceiver($connection, $serializer);
84-
$receiver->receive(function () {
85-
throw new InterruptException('Well...');
86-
});
59+
return [
60+
'id' => 1,
61+
'body' => '{"message": "Hi"}',
62+
'headers' => [
63+
'type' => DummyMessage::class,
64+
],
65+
];
8766
}
8867

89-
/**
90-
* @expectedException \Symfony\Component\Messenger\Tests\Transport\RedisExt\WillNeverWorkException
91-
*/
92-
public function testItRejectsTheMessageIfTheExceptionIsARejectMessageExceptionInterface()
68+
private function createSerializer(): Serializer
9369
{
9470
$serializer = new Serializer(
95-
new SerializerComponent\Serializer(array(new ObjectNormalizer()), array('json' => new JsonEncoder()))
71+
new SerializerComponent\Serializer([new ObjectNormalizer()], ['json' => new JsonEncoder()])
9672
);
9773

98-
$envelope = Envelope::wrap(new DummyMessage('Hi'));
99-
$encoded = $serializer->encode($envelope);
100-
101-
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
102-
$connection->method('waitAndGet')->willReturn($encoded);
103-
$connection->expects($this->once())->method('reject')->with($encoded);
104-
105-
$receiver = new RedisReceiver($connection, $serializer);
106-
$receiver->receive(function () {
107-
throw new WillNeverWorkException('Well...');
108-
});
74+
return $serializer;
10975
}
11076
}
111-
112-
class InterruptException extends \Exception
113-
{
114-
}
115-
116-
class WillNeverWorkException extends \Exception implements RejectMessageExceptionInterface
117-
{
118-
}

‎src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisSenderTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisSenderTest.php
+9-9Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,27 +13,27 @@
1313

1414
use PHPUnit\Framework\TestCase;
1515
use Symfony\Component\Messenger\Envelope;
16+
use Symfony\Component\Messenger\Stamp\DelayStamp;
1617
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
1718
use Symfony\Component\Messenger\Transport\RedisExt\Connection;
1819
use Symfony\Component\Messenger\Transport\RedisExt\RedisSender;
1920
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
2021

21-
/**
22-
* @requires extension redis
23-
*/
2422
class RedisSenderTest extends TestCase
2523
{
26-
public function testItSendsTheEncodedMessage()
24+
public function testSend()
2725
{
28-
$envelope = Envelope::wrap(new DummyMessage('Oy'));
29-
$encoded = array('body' => '...', 'headers' => array('type' => DummyMessage::class));
26+
$envelope = new Envelope(new DummyMessage('Oy'));
27+
$encoded = ['body' => '...', 'headers' => ['type' => DummyMessage::class]];
28+
29+
$connection = $this->getMockBuilder(Connection::class)
30+
->disableOriginalConstructor()
31+
->getMock();
32+
$connection->expects($this->once())->method('add')->with($encoded);
3033

3134
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock();
3235
$serializer->method('encode')->with($envelope)->willReturnOnConsecutiveCalls($encoded);
3336

34-
$connection = $this->getMockBuilder(Connection::class)->disableOriginalConstructor()->getMock();
35-
$connection->expects($this->once())->method('add')->with($encoded);
36-
3737
$sender = new RedisSender($connection, $serializer);
3838
$sender->send($envelope);
3939
}

‎src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisTransportFactoryTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisTransportFactoryTest.php
+9-13Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,23 +21,19 @@ class RedisTransportFactoryTest extends TestCase
2121
{
2222
public function testSupportsOnlyRedisTransports()
2323
{
24-
$factory = new RedisTransportFactory(
25-
$this->getMockBuilder(SerializerInterface::class)->getMock()
26-
);
24+
$factory = new RedisTransportFactory();
2725

28-
$this->assertTrue($factory->supports('redis://localhost', array()));
29-
$this->assertFalse($factory->supports('sqs://localhost', array()));
30-
$this->assertFalse($factory->supports('invalid-dsn', array()));
26+
$this->assertTrue($factory->supports('redis://localhost', []));
27+
$this->assertFalse($factory->supports('sqs://localhost', []));
28+
$this->assertFalse($factory->supports('invalid-dsn', []));
3129
}
3230

33-
public function testItCreatesTheTransport()
31+
public function testCreateTransport()
3432
{
35-
$factory = new RedisTransportFactory(
36-
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock()
37-
);
33+
$factory = new RedisTransportFactory();
34+
$serializer = $this->getMockBuilder(SerializerInterface::class)->getMock();
35+
$expectedTransport = new RedisTransport(Connection::fromDsn('redis://localhost', ['foo' => 'bar']), $serializer);
3836

39-
$expectedTransport = new RedisTransport(Connection::fromDsn('redis://localhost', array('foo' => 'bar'), true), $serializer);
40-
41-
$this->assertEquals($expectedTransport, $factory->createTransport('redis://localhost', array('foo' => 'bar')));
37+
$this->assertEquals($expectedTransport, $factory->createTransport('redis://localhost', ['foo' => 'bar'], $serializer));
4238
}
4339
}

‎src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisTransportTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisTransportTest.php
+9-10Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,6 @@
1919
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
2020
use Symfony\Component\Messenger\Transport\TransportInterface;
2121

22-
/**
23-
* @requires extension redis
24-
*/
2522
class RedisTransportTest extends TestCase
2623
{
2724
public function testItIsATransport()
@@ -39,16 +36,18 @@ public function testReceivesMessages()
3936
);
4037

4138
$decodedMessage = new DummyMessage('Decoded.');
42-
$encodedMessage = array('body' => 'body', 'headers' => array('my' => 'header'));
4339

44-
$serializer->method('decode')->with($encodedMessage)->willReturn(Envelope::wrap($decodedMessage));
45-
$connection->method('waitAndGet')->willReturn($encodedMessage);
40+
$redisEnvelope = [
41+
'id' => '5',
42+
'body' => 'body',
43+
'headers' => ['my' => 'header'],
44+
];
4645

47-
$transport->receive(function (Envelope $envelope) use ($transport, $decodedMessage) {
48-
$this->assertSame($decodedMessage, $envelope->getMessage());
46+
$serializer->method('decode')->with(['body' => 'body', 'headers' => ['my' => 'header']])->willReturn(new Envelope($decodedMessage));
47+
$connection->method('get')->willReturn($redisEnvelope);
4948

50-
$transport->stop();
51-
});
49+
$envelopes = iterator_to_array($transport->get());
50+
$this->assertSame($decodedMessage, $envelopes[0]->getMessage());
5251
}
5352

5453
private function getTransport(SerializerInterface $serializer = null, Connection $connection = null)

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.