-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
[Messenger] Add receiving of old pending messages (redis) #35384
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Thanks for the PR. I'll find time to give my thoughts asap. |
As written with you on slack @toooni this should moved into a function call inside the following if line: symfony/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php Lines 174 to 176 in 9f93a38
so e.g.: if ($this->couldHavePendingMessages) {
if ($this->isRandomConsumer) {
$this->claimOldMessages(); // this function does the xpending and xclaim
// the logic after this doesn't need to change as it will read the correct messages
}
$messageId = '0'; // will receive consumers pending/claimed messages
} as it is required to have a uniqid consumer name for your flag you should do that in the constructor of the connection here:
e.g.: if ($configuration['randomconsumer'] === 'true') {
$this->consumer .= '_'.uniqid();
$this->isRandomConsumer = true;
} and currently you still will end in a race condition as you still use the same consumer name else both will claim the same messages. The couldHavePendingMessages should be at start on true and only then claim old messages not in every loop so that flag you should not touch in your PR when you move the logic to the claimOldMessages as written above. The whole logic should be behind a flag e.g. CONSUMER_NAME=consumer1 bin/console messenger:consume ...
CONSUMER_NAME=consumer2 bin/console messenger:consume ...
CONSUMER_NAME=consumer3 bin/console messenger:consume ... And for kubernetes the stateful should be used. This implementation is something I would only use if you have no control which with env the consumer are started and you wanted to have multiple consumers. |
@alexander-schranz I don't think so. With the PR the flow looks like the following (It's not everything we talked about):
This PR currently has nothing to do with random consumer names. I intentionally didn't include this in the PR because it's not directly related to this. The benefit of this PR is that no messages are being lost - even if you are using random consumer names or for example the hostname as the consumer name (or you are just switching to a lower number of consumers). Scaling consumers up or down without this feature is not really possible. To have |
@toooni currenty never a message is lost because on restart the couldHavePendingMessages is true and will read all messages which are not acked so this statement is not true when using it the correct way. Using this implementation you provided without a random consumer name could end up in the same error you have in your issue as if 2 consume messages start at the same time a race condition between xpending and xclaim both processes would have the same consumer name and so they will process the same messages the xclaim doesnt do anything in this case as the old messages where the same consumer name. So the only thing is that we stopped reading crashed pending messages immediately after the crash and make the maybe exception a little less common because it happens later at the time-lmit but it still can happen at that place if the processes start at the same time, so for me this way is not a fix of that issue then. Unless that if we really want to implement it this way we should move that xpending <-> xclaim part into a function and call that not on every |
src/Symfony/Component/Messenger/Tests/Transport/RedisExt/ConnectionTest.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php
Outdated
Show resolved
Hide resolved
I tested a little bit around and thing we should implement it this way then @toooni: We only claim messages if there are no pending messages so we don't change the old behaviour. private $couldHavePendingMessages = true;
private $lastClaim = 0;
private $claimInterval = 1000; // not sure in which interval we want to check for claim messages maybe evey 10 seconds? parameter should be configureable ?claim-interval=
public function get()
{
// ...
if (!$this->couldHavePendingMessages && ($this->lastClaim + $this->claimInterval) > time()) {
$this->claimOldPendingMessages();
}
$messageId = '>'; // will receive new messages
if ($this->couldHavePendingMessages) {
$messageId = '0'; // will receive pending messages
}
// ...
}
private function claimOldPendingMessages()
{
try {
// This could soon be optimized with https://github.com/antirez/redis/issues/5212 or
// https://github.com/antirez/redis/issues/6256
$pendingMessages = $this->connection->xpending($this->stream, $this->group, '-', '+', 3);
} catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
$claimableIds = [];
foreach ($pendingMessages as $pendingMessage) {
if ($pendingMessage[1] === $this->consumer) {
$this->couldHavePendingMessages = true;
continue;
}
if ($pendingMessage[2] < (time() - $this->redeliveryTime)) {
$claimableIds[] = $pendingMessage[0];
}
}
if (\count($claimableIds) > 0) {
try {
$this->connection->xclaim(
$this->stream,
$this->group,
$this->consumer,
$this->claimTimeout,
$claimableIds,
['JUSTID']
);
$this->couldHavePendingMessages = true;
} catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}
}
$this->lastClaim = time();
} With that claimInterval it will be possible to disable the behaviour if somebody is using a |
Just for traceability:
The messages are lost if there won't be a consumer with the same name again. This isn't only an issue if you want to use random names but also if you want to scale your workers up and down. |
src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php
Outdated
Show resolved
Hide resolved
Oh yeah you are totally correct about that use case 👍 some little changes then it looks good for me. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@toooni thx for your time and patience. Code looks code from my side need to test it in the evening.
@toooni as the xpending response doesnt seems to look the same for me I needed to change how the message timestamp is read. And I needed to disable the claimIdleTime for the integration test as symfony clockmock have no effect on the redis min idle time on xclaim. diff --git a/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.php b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.php
index 537c3fba68..04a61e0e9c 100644
--- a/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.php
+++ b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.php
@@ -32,6 +32,7 @@ class RedisExtIntegrationTest extends TestCase
$this->redis = new \Redis();
$this->connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), [], $this->redis);
+ $this->connection->disableClaimIdleTime(true);
$this->connection->cleanup();
$this->connection->setup();
}
@@ -104,4 +105,36 @@ class RedisExtIntegrationTest extends TestCase
$this->assertEquals($body, $encoded['body']);
$this->assertEquals($headers, $encoded['headers']);
}
+
+ public function testConnectionGetClaimMessages()
+ {
+ $body = '{"message": "Hi"}';
+ $headers = ['type' => DummyMessage::class];
+
+ $this->connection->add($body, $headers);
+
+ // Read message with other consumer
+ $messages = $this->redis->xreadgroup(
+ $this->connection->getGroup(),
+ 'other-consumer2',
+ [$this->connection->getStream() => '>'],
+ 1
+ );
+
+ // Queue will not have any messages yet
+ $this->assertNull($this->connection->get());
+
+ // Wait for next claim check
+ sleep(1001);
+
+ // Should still be empty
+ $this->assertNull($this->connection->get());
+
+ // Wait for redelivery timeout
+ sleep(2600);
+
+ $encoded = $this->connection->get();
+ $this->assertEquals($body, $encoded['body']);
+ $this->assertEquals($headers, $encoded['headers']);
+ }
}
diff --git a/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php b/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php
index fea1bb8cbf..959e6b6a8f 100644
--- a/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php
+++ b/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php
@@ -48,6 +48,7 @@ class Connection
private $redeliverTimeout;
private $nextClaim = 0;
private $claimInterval;
+ private $disabledClaimIdleTime = false;
private $couldHavePendingMessages = true;
public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [], \Redis $redis = null)
@@ -74,7 +75,7 @@ class Connection
$this->queue = $this->stream.'__queue';
$this->autoSetup = $configuration['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup'];
$this->maxEntries = $configuration['stream_max_entries'] ?? self::DEFAULT_OPTIONS['stream_max_entries'];
- $this->redeliverTimeout = ($configuration['redeliver_timeout'] ?? self::DEFAULT_OPTIONS['redeliver_timeout']) * 1000;
+ $this->redeliverTimeout = $configuration['redeliver_timeout'] ?? self::DEFAULT_OPTIONS['redeliver_timeout'];
$this->claimInterval = $configuration['claim_interval'] ?? self::DEFAULT_OPTIONS['claim_interval'];
}
@@ -167,6 +168,7 @@ class Connection
}
$claimableIds = [];
+
foreach ($pendingMessages as $pendingMessage) {
if ($pendingMessage[1] === $this->consumer) {
$this->couldHavePendingMessages = true;
@@ -174,18 +176,26 @@ class Connection
return;
}
- if ($pendingMessage[2] < (time() - $this->redeliverTimeout)) {
+ $messageTimestampInSeconds = (int) (intval($pendingMessage[0]) / 1000);
+ if ($messageTimestampInSeconds < (time() - $this->redeliverTimeout)) {
$claimableIds[] = $pendingMessage[0];
}
}
if (\count($claimableIds) > 0) {
+ $claimIdleTime = ($this->redeliverTimeout * 1000);
+
+ if ($this->disabledClaimIdleTime) {
+ // For tests it needed to disable the idle timeout
+ $claimIdleTime = 0;
+ }
+
try {
$this->connection->xclaim(
$this->stream,
$this->group,
$this->consumer,
- $this->redeliverTimeout,
+ $claimIdleTime,
$claimableIds,
['JUSTID']
);
@@ -388,9 +398,24 @@ class Connection
return (int) (microtime(true) * 1000);
}
+ public function disableClaimIdleTime(): void
+ {
+ $this->disabledClaimIdleTime = true;
+ }
+
public function cleanup(): void
{
$this->connection->del($this->stream);
$this->connection->del($this->queue);
}
+
+ public function getStream(): string
+ {
+ return $this->stream;
+ }
+
+ public function getGroup(): string
+ {
+ return $this->group;
+ }
} |
src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.php
Outdated
Show resolved
Hide resolved
@chalasr @alexander-schranz I've updated the |
Which would mean that they are using this change as a new feature, since they would need to update their code. Upgrading to patch versions should not involve code changes. Sorry to insist, but if we are not confident that the default values are good enough for everyone as a bugfix, then this PR should target master. |
I will change the PR to target the |
@chalasr The PR now targets the master branch 👍 |
@chalasr I've rebased again but travisci is still failing because of another |
Thank you @toooni. |
…is) (toooni) This PR was merged into the 5.1-dev branch. Discussion ---------- [Messenger] Add receiving of old pending messages (redis) | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | Deprecations? | no | Tickets | | License | MIT | Doc PR | symfony/symfony-docs#12976 This PR makes it possible for the redis transport to get abandoned messages from not running/idle consumers by using `XPENDING` and `XCLAIM`. Usually it would be best to let the claiming of pending messages be handled by a separate command. Since the messenger component's commands are fixed, we do need to set a `claimTimeout`. The `claimTimeout` defines how long an idle message should be left alone until it will get claimed by the current consumer (Must be a value higher than the longest running handling time of a message or else the message will be handled twice). Using this solution makes the remarks (symfony/symfony-docs#11869 (review)) regarding not being able to use the hostname as consumer name obsolete. I would even recommend the hostname as the consumer name. **Questions** - [x] Which value should we use as default `claimTimeout`? - [x] How should the `claimTimeout` be configured? - [x] Feature or Bugfix? I will create a docs PR and a PR for the other branches as soon as the questions are resolved. Commits ------- 9cb6fdf Implemted receiving of old pending messages
This PR was merged into the 5.1-dev branch. Discussion ---------- [Messenger][Redis] Add missing changelog entry | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | no | Deprecations? | no | Tickets | - | License | MIT | Doc PR | - Missed in #35384 Commits ------- 08fb0c4 [Messenger][Redis] Add missing changelog entry
Is this feature implemented for symfony 4.4? |
Hello @appaydin, no this is part of 5.1 as you see linked above in the Release 5.1. But there is also another way to find out in which release a merged pull request is by clicking in github on the merge commit (look for the merge symbol) and then click on the commit id in this case And then you see there at the top when you expand the |
New features are never backported to patch releases. |
My opinion should not be this new version. Because I'm getting this error right now. I have implemented a workaround by increasing the "startsecs" and "startretries" values with Supervisor. |
@appaydin this is a feature for scaling down consumers. You should as redis recommend always reuse consumer names and make sure that your consumer name is unique. Don't use random consumer names thats not recommended by redis as it will bloat the memory usage of redis. Aslong as you don't need to scale down consumers you should not need this feature of receiving messages of other consumers. |
…ooni) This PR was submitted for the master branch but it was squashed and merged into the 5.1 branch instead. Discussion ---------- Added redeliver_timeout and claim_interval options Adds new optional config parameters introduced by symfony/symfony#35384 Commits ------- 8fe20c3 Added redeliver_timeout and claim_interval options
This PR makes it possible for the redis transport to get abandoned messages from not running/idle consumers by using
XPENDING
andXCLAIM
.Usually it would be best to let the claiming of pending messages be handled by a separate command. Since the messenger component's commands are fixed, we do need to set a
claimTimeout
. TheclaimTimeout
defines how long an idle message should be left alone until it will get claimed by the current consumer (Must be a value higher than the longest running handling time of a message or else the message will be handled twice).Using this solution makes the remarks (symfony/symfony-docs#11869 (review)) regarding not being able to use the hostname as consumer name obsolete. I would even recommend the hostname as the consumer name.
Questions
claimTimeout
?claimTimeout
be configured?I will create a docs PR and a PR for the other branches as soon as the questions are resolved.