Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit e355a4e

Browse filesBrowse files
Refractor using redis streams
1 parent 828f018 commit e355a4e
Copy full SHA for e355a4e

File tree

7 files changed

+185
-170
lines changed
Filter options

7 files changed

+185
-170
lines changed

‎src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php
+53-92Lines changed: 53 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -12,45 +12,49 @@
1212
namespace Symfony\Component\Messenger\Transport\RedisExt;
1313

1414
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
15+
use Symfony\Component\Messenger\Exception\LogicException;
1516

1617
/**
18+
* A Redis connection.
19+
*
1720
* @author Antoine Bluchet <soyuka@gmail.com>
21+
* @author Alexander Schranz <alexander@sulu.io>
22+
*
23+
* @final
24+
*
25+
* @experimental in 4.3
1826
*/
1927
class Connection
2028
{
21-
const PROCESSING_QUEUE_SUFFIX = '_processing';
22-
const DEFAULT_CONNECTION_CREDENTIALS = array('host' => '127.0.0.1', 'port' => 6379);
23-
const DEFAULT_REDIS_OPTIONS = array('serializer' => \Redis::SERIALIZER_PHP, 'processing_ttl' => 10000, 'blocking_timeout' => 1000);
24-
25-
/**
26-
* @var \Redis
27-
*/
2829
private $connection;
30+
private $stream;
31+
private $group;
32+
private $consumer;
33+
private $blockingTimeout;
2934

30-
/**
31-
* @var string
32-
*/
33-
private $queue;
34-
35-
public function __construct(string $queue, array $connectionCredentials = self::DEFAULT_CONNECTION_CREDENTIALS, array $redisOptions = self::DEFAULT_REDIS_OPTIONS)
35+
public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [])
3636
{
3737
$this->connection = new \Redis();
3838
$this->connection->connect($connectionCredentials['host'] ?? '127.0.0.1', $connectionCredentials['port'] ?? 6379);
3939
$this->connection->setOption(\Redis::OPT_SERIALIZER, $redisOptions['serializer'] ?? \Redis::SERIALIZER_PHP);
40-
// We force this because we rely on the fact that redis doesn't timeout with bRPopLPush
41-
$this->connection->setOption(\Redis::OPT_READ_TIMEOUT, -1);
42-
$this->queue = $queue;
43-
$this->processingTtl = $redisOptions['processing_ttl'] ?? self::DEFAULT_REDIS_OPTIONS['processing_ttl'];
44-
$this->blockingTimeout = $redisOptions['blocking_timeout'] ?? self::DEFAULT_REDIS_OPTIONS['blocking_timeout'];
40+
$this->stream = $configuration['stream'] ?? 'messages';
41+
$this->group = $configuration['group'] ?? 'symfony';
42+
$this->consumer = $configuration['consumer'] ?? 'consumer';
43+
$this->blockingTimeout = $redisOptions['blocking_timeout'] ?? null;
4544
}
4645

47-
public static function fromDsn(string $dsn, array $redisOptions = self::DEFAULT_REDIS_OPTIONS): self
46+
public static function fromDsn(string $dsn, array $redisOptions = []): self
4847
{
4948
if (false === $parsedUrl = parse_url($dsn)) {
5049
throw new InvalidArgumentException(sprintf('The given Redis DSN "%s" is invalid.', $dsn));
5150
}
5251

53-
$queue = isset($parsedUrl['path']) ? trim($parsedUrl['path'], '/') : $redisOptions['queue'] ?? 'messages';
52+
$pathParts = explode('/', $parsedUrl['path']);
53+
54+
$stream = $pathParts[1] ?? '';
55+
$group = $pathParts[2] ?? '';
56+
$consumer = $pathParts[3] ?? '';
57+
5458
$connectionCredentials = array(
5559
'host' => $parsedUrl['host'] ?? '127.0.0.1',
5660
'port' => $parsedUrl['port'] ?? 6379,
@@ -61,96 +65,53 @@ public static function fromDsn(string $dsn, array $redisOptions = self::DEFAULT_
6165
$redisOptions = array_replace_recursive($redisOptions, $parsedQuery);
6266
}
6367

64-
return new self($queue, $connectionCredentials, $redisOptions);
68+
return new self(['stream' => $stream, 'group' => $group, 'consumer' => $consumer], $connectionCredentials, $redisOptions);
6569
}
6670

67-
/**
68-
* Takes last element (tail) of the list and add it to the processing queue (head - blocking)
69-
* Also sets a key with TTL that will be checked by the `doCheck` method.
70-
*/
71-
public function waitAndGet(): ?array
71+
public function get(): iterable
7272
{
73-
$this->doCheck();
74-
$value = $this->connection->bRPopLPush($this->queue, $this->queue.self::PROCESSING_QUEUE_SUFFIX, $this->blockingTimeout);
73+
$messages = $this->connection->xreadgroup(
74+
$this->group,
75+
$this->consumer,
76+
[$this->stream => '>'],
77+
1,
78+
$this->blockingTimeout
79+
);
7580

76-
// false in case of timeout
77-
if (false === $value) {
78-
return null;
81+
if (false === $messages) {
82+
throw new LogicException(
83+
$this->connection->getLastError() ?: 'Unexpected redis stream error happened.'
84+
);
7985
}
8086

81-
$key = md5($value['body']);
82-
$this->connection->set($key, 1, array('px' => $this->processingTtl));
83-
84-
return $value;
85-
}
87+
foreach ($messages[$this->stream] as $key => $message) {
88+
$redisEnvelope = \json_decode($message, true);
8689

87-
/**
88-
* Acknowledge the message:
89-
* 1. Remove the ttl key
90-
* 2. LREM the message from the processing list.
91-
*/
92-
public function ack($message)
93-
{
94-
$key = md5($message['body']);
95-
$processingQueue = $this->queue.self::PROCESSING_QUEUE_SUFFIX;
96-
$this->connection->multi()
97-
->lRem($processingQueue, $message)
98-
->del($key)
99-
->exec();
90+
yield [
91+
'id' => $key,
92+
'body' => $redisEnvelope['headers'],
93+
'headers' => $redisEnvelope['headers'],
94+
];
95+
}
10096
}
10197

102-
/**
103-
* Reject the message: we acknowledge it, means we remove it form the queues.
104-
*
105-
* @TODO: log something?
106-
*/
107-
public function reject($message)
98+
public function ack(string $id): bool
10899
{
109-
$this->ack($message);
100+
$this->connection->xack($this->stream, $this->group, [$id]);
110101
}
111102

112-
/**
113-
* Requeue - add it back to the queue
114-
* All we have to do is to make our key expire and let the `doCheck` system manage it.
115-
*/
116-
public function requeue($message)
103+
public function reject(string $id): bool
117104
{
118-
$key = md5($message['body']);
119-
$this->connection->expire($key, -1);
105+
$this->connection->xdel($this->stream, [$id]);
120106
}
121107

122-
/**
123-
* Add item at the tail of list.
124-
*/
125-
public function add($message)
108+
public function add(array $message, int $delay)
126109
{
127-
$this->connection->lpush($this->queue, $message);
110+
$this->connection->xadd($this->stream, '*', ['content' => json_encode($message)]);
128111
}
129112

130-
/**
131-
* The check:
132-
* 1. Get the processing queue items
133-
* 2. Check if the TTL is over
134-
* 3. If it is, rpush back the message to the origin queue.
135-
*/
136-
private function doCheck()
113+
public function setup(): void
137114
{
138-
$processingQueue = $this->queue.self::PROCESSING_QUEUE_SUFFIX;
139-
$pending = $this->connection->lRange($processingQueue, 0, -1);
140-
141-
foreach ($pending as $temp) {
142-
$key = md5($temp['body']);
143-
144-
if ($this->connection->ttl($key) > 0) {
145-
continue;
146-
}
147-
148-
$this->connection
149-
->multi()
150-
->del($key)
151-
->lRem($processingQueue, $temp, 1)
152-
->rPush($this->queue, $temp)
153-
->exec();
154-
}
115+
$this->connection->xgroup('CREATE', $this->stream, $this->group, 0, true);
155116
}
156117
}

‎src/Symfony/Component/Messenger/Transport/RedisExt/Exception/RejectMessageExceptionInterface.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Transport/RedisExt/Exception/RejectMessageExceptionInterface.php
-25Lines changed: 0 additions & 25 deletions
This file was deleted.
+34Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Transport\RedisExt;
13+
14+
use Symfony\Component\Messenger\Stamp\StampInterface;
15+
16+
/**
17+
* @author Alexander Schranz <alexander@sulu.io>
18+
*
19+
* @experimental in 4.3
20+
*/
21+
class RedisReceivedStamp implements StampInterface
22+
{
23+
private $id;
24+
25+
public function __construct(string $id)
26+
{
27+
$this->id = $id;
28+
}
29+
30+
public function getId(): string
31+
{
32+
return $this->id;
33+
}
34+
}

‎src/Symfony/Component/Messenger/Transport/RedisExt/RedisReceiver.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Transport/RedisExt/RedisReceiver.php
+48-31Lines changed: 48 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -11,61 +11,78 @@
1111

1212
namespace Symfony\Component\Messenger\Transport\RedisExt;
1313

14-
use Symfony\Component\Messenger\Transport\ReceiverInterface;
15-
use Symfony\Component\Messenger\Transport\RedisExt\Exception\RejectMessageExceptionInterface;
14+
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Exception\LogicException;
16+
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
17+
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
18+
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
1619
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
1720

1821
/**
1922
* @author Antoine Bluchet <soyuka@gmail.com>
23+
* @author Alexander Schranz <alexander@sulu.io>
2024
*/
2125
class RedisReceiver implements ReceiverInterface
2226
{
2327
private $connection;
2428
private $serializer;
25-
private $shouldStop = false;
2629

27-
public function __construct(Connection $connection, SerializerInterface $serializer)
30+
public function __construct(Connection $connection, SerializerInterface $serializer = null)
2831
{
2932
$this->connection = $connection;
30-
$this->serializer = $serializer;
33+
$this->serializer = $serializer ?? new PhpSerializer();
3134
}
3235

3336
/**
3437
* {@inheritdoc}
3538
*/
36-
public function receive(callable $handler): void
39+
public function get(): iterable
3740
{
38-
while (!$this->shouldStop) {
39-
if (null === $message = $this->connection->waitAndGet()) {
40-
$handler(null);
41-
if (\function_exists('pcntl_signal_dispatch')) {
42-
pcntl_signal_dispatch();
43-
}
41+
$redisEnvelope = $this->connection->get();
4442

45-
continue;
46-
}
47-
48-
try {
49-
$handler($this->serializer->decode($message));
50-
$this->connection->ack($message);
51-
} catch (RejectMessageExceptionInterface $e) {
52-
$this->connection->reject($message);
43+
if (null === $redisEnvelope) {
44+
return [];
45+
}
5346

54-
throw $e;
55-
} catch (\Throwable $e) {
56-
$this->connection->requeue($message);
47+
try {
48+
$envelope = $this->serializer->decode([
49+
'body' => $redisEnvelope['body'],
50+
'headers' => $redisEnvelope['headers'],
51+
]);
52+
} catch (MessageDecodingFailedException $exception) {
53+
$this->connection->reject($redisEnvelope['id']);
5754

58-
throw $e;
59-
} finally {
60-
if (\function_exists('pcntl_signal_dispatch')) {
61-
pcntl_signal_dispatch();
62-
}
63-
}
55+
throw $exception;
6456
}
57+
58+
yield $envelope->with(new RedisReceivedStamp($redisEnvelope['id']));
59+
}
60+
61+
/**
62+
* {@inheritdoc}
63+
*/
64+
public function ack(Envelope $envelope): void
65+
{
66+
$this->connection->ack($this->findRedisReceivedStamp($envelope)->getId());
6567
}
6668

67-
public function stop(): void
69+
/**
70+
* {@inheritdoc}
71+
*/
72+
public function reject(Envelope $envelope): void
6873
{
69-
$this->shouldStop = true;
74+
$this->connection->reject($this->findRedisReceivedStamp($envelope)->getId());
75+
}
76+
77+
private function findRedisReceivedStamp(Envelope $envelope): RedisReceivedStamp
78+
{
79+
/** @var RedisReceivedStamp|null $redisReceivedStamp */
80+
$redisReceivedStamp = $envelope->last(RedisReceivedStamp::class);
81+
82+
if (null === $redisReceivedStamp) {
83+
throw new LogicException('No RedisReceivedStamp found on the Envelope.');
84+
}
85+
86+
return $redisReceivedStamp;
7087
}
7188
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.