Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

[Messenger] Add receiving of old pending messages (redis) #35384

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 8, 2020
Merged

[Messenger] Add receiving of old pending messages (redis) #35384

merged 1 commit into from
Feb 8, 2020

Conversation

toooni
Copy link
Contributor

@toooni toooni commented Jan 19, 2020

Q A
Branch? master
Bug fix? no
New feature? yes
Deprecations? no
Tickets
License MIT
Doc PR symfony/symfony-docs#12976

This PR makes it possible for the redis transport to get abandoned messages from not running/idle consumers by using XPENDING and XCLAIM.
Usually it would be best to let the claiming of pending messages be handled by a separate command. Since the messenger component's commands are fixed, we do need to set a claimTimeout. The claimTimeout defines how long an idle message should be left alone until it will get claimed by the current consumer (Must be a value higher than the longest running handling time of a message or else the message will be handled twice).
Using this solution makes the remarks (symfony/symfony-docs#11869 (review)) regarding not being able to use the hostname as consumer name obsolete. I would even recommend the hostname as the consumer name.

Questions

  • Which value should we use as default claimTimeout?
  • How should the claimTimeout be configured?
  • Feature or Bugfix?

I will create a docs PR and a PR for the other branches as soon as the questions are resolved.

@toooni toooni requested a review from sroze as a code owner January 19, 2020 08:32
@toooni toooni changed the title Use xpending and xclaim to retrieve abandoned messages [Messenger] Redis Transport: Use XPENDING and XCLAIM to retrieve abandoned messages Jan 19, 2020
@chalasr chalasr added this to the 4.4 milestone Jan 19, 2020
@chalasr
Copy link
Member

chalasr commented Jan 19, 2020

Thanks for the PR. I'll find time to give my thoughts asap.
ping @alexander-schranz also

@alexander-schranz
Copy link
Contributor

alexander-schranz commented Jan 21, 2020

As written with you on slack @toooni this should moved into a function call inside the following if line:

if ($this->couldHavePendingMessages) {
$messageId = '0'; // will receive consumers pending messages
}

so e.g.:

        if ($this->couldHavePendingMessages) {
            if ($this->isRandomConsumer) {
                $this->claimOldMessages(); // this function does the xpending and xclaim 
                // the logic after this doesn't need to change as it will read the correct messages
            }
            $messageId = '0'; // will receive consumers pending/claimed messages
        }

as it is required to have a uniqid consumer name for your flag you should do that in the constructor of the connection here:

$this->consumer = $configuration['consumer'] ?? self::DEFAULT_OPTIONS['consumer'];

e.g.:

if ($configuration['randomconsumer'] === 'true') {
     $this->consumer .= '_'.uniqid();
     $this->isRandomConsumer = true;
}

and currently you still will end in a race condition as you still use the same consumer name else both will claim the same messages.

The couldHavePendingMessages should be at start on true and only then claim old messages not in every loop so that flag you should not touch in your PR when you move the logic to the claimOldMessages as written above.

The whole logic should be behind a flag e.g. randomConsumer=true as it is a bc break how messages are consumed, because currently one consumer you could be 100% sure that the order of the messages is the same as the created this can not longer be provided. As written its bad practice to spam redis with new consumers on every connection so its not what I recommend to use, but I understand thats for some people its annoying to set a unique consumer name themselves. My recommendation still would be if you want to start multiple processes on the same server use something like instead of a random consumer which claims other messages:

CONSUMER_NAME=consumer1 bin/console messenger:consume ...
CONSUMER_NAME=consumer2 bin/console messenger:consume ...
CONSUMER_NAME=consumer3 bin/console messenger:consume ...

And for kubernetes the stateful should be used. This implementation is something I would only use if you have no control which with env the consumer are started and you wanted to have multiple consumers.

@toooni
Copy link
Contributor Author

toooni commented Jan 21, 2020

@alexander-schranz I don't think so. With the PR the flow looks like the following (It's not everything we talked about):

  • couldHavePendingMessages is false by default
  • XPENDING checks for pending message
  • if there is a pending message for the current consumer couldHavePendingMessages is set to true
  • if there is a pending message from another consumer which has a higher idle than the claimTimeout, the message gets claimed. (XPENDING check on next run will return this message again for the current consumer which will then set couldHavePendingMessages to true
  • depending on couldHavePendingMessages the id 0 or > is set to get a pending message or the next message in the queue

This PR currently has nothing to do with random consumer names. I intentionally didn't include this in the PR because it's not directly related to this. The benefit of this PR is that no messages are being lost - even if you are using random consumer names or for example the hostname as the consumer name (or you are just switching to a lower number of consumers). Scaling consumers up or down without this feature is not really possible.

To have XPENDING run on every get() call isn't very good for performance, i know. But it makes the whole transport safe to not loosing messages whatever you do. (Like the delay implementation).
The best would be if XPENDING with XCLAIM would be called by a second command regularly but unfortunately this isn't possible.
Not running XPENDING on every get() call and using a timeout or counter to use XPENDING only periodically does not work because of possible low time-limit, memory-limit or limit options of the consume command.

@alexander-schranz
Copy link
Contributor

alexander-schranz commented Jan 22, 2020

@toooni currenty never a message is lost because on restart the couldHavePendingMessages is true and will read all messages which are not acked so this statement is not true when using it the correct way.

Using this implementation you provided without a random consumer name could end up in the same error you have in your issue as if 2 consume messages start at the same time a race condition between xpending and xclaim both processes would have the same consumer name and so they will process the same messages the xclaim doesnt do anything in this case as the old messages where the same consumer name. So the only thing is that we stopped reading crashed pending messages immediately after the crash and make the maybe exception a little less common because it happens later at the time-lmit but it still can happen at that place if the processes start at the same time, so for me this way is not a fix of that issue then.

Unless that if we really want to implement it this way we should move that xpending <-> xclaim part into a function and call that not on every get. I would say on first call of the get and when the time is reached or a xpending message could be old enough to get claimed.

@alexander-schranz
Copy link
Contributor

alexander-schranz commented Jan 22, 2020

I tested a little bit around and thing we should implement it this way then @toooni:

We only claim messages if there are no pending messages so we don't change the old behaviour.
So on first start it will behave like before. First read its own pending message and finish it. After it finished couldHavePendingMessages is false and it will claim messages of other consumers. With the claimInterval we avoid that its called on every get and make it also possible to disable the behaviour by providing a high claimInterval.

    private $couldHavePendingMessages = true;
    private $lastClaim = 0;
    private $claimInterval = 1000; // not sure in which interval we want to check for claim messages maybe evey 10 seconds? parameter should be configureable ?claim-interval=

    public function get()
    {
         // ...

         if (!$this->couldHavePendingMessages && ($this->lastClaim + $this->claimInterval) > time()) {
               $this->claimOldPendingMessages();
         }

         $messageId = '>'; // will receive new messages
         if ($this->couldHavePendingMessages) {
             $messageId = '0'; // will receive pending messages
         }

         // ...
    }

     private function claimOldPendingMessages()
     {
        try {
            // This could soon be optimized with https://github.com/antirez/redis/issues/5212 or
            // https://github.com/antirez/redis/issues/6256
            $pendingMessages = $this->connection->xpending($this->stream, $this->group, '-', '+', 3);
	
        } catch (\RedisException $e) {
            throw new TransportException($e->getMessage(), 0, $e);
        }

        $claimableIds = [];
        foreach ($pendingMessages as $pendingMessage) {
            if ($pendingMessage[1] === $this->consumer) {
                 $this->couldHavePendingMessages = true;
                 continue;
            }

            if ($pendingMessage[2] < (time() - $this->redeliveryTime)) {
                $claimableIds[] = $pendingMessage[0];
            }
        }

        if (\count($claimableIds) > 0) {
            try {
                $this->connection->xclaim(
                    $this->stream,
                    $this->group,
                    $this->consumer,
                    $this->claimTimeout,
                    $claimableIds,
                    ['JUSTID']
                );
	
                 $this->couldHavePendingMessages = true;
            } catch (\RedisException $e) {
                throw new TransportException($e->getMessage(), 0, $e);
            }
        }

        $this->lastClaim = time();
     }

With that claimInterval it will be possible to disable the behaviour if somebody is using a

@toooni
Copy link
Contributor Author

toooni commented Jan 22, 2020

Just for traceability:

currenty never a message is lost because on restart the couldHavePendingMessages is true and will read all messages which are not acked so this statement is not true when using it the correct way.

The messages are lost if there won't be a consumer with the same name again. This isn't only an issue if you want to use random names but also if you want to scale your workers up and down.

@alexander-schranz
Copy link
Contributor

@toooni

The messages are lost if there won't be a consumer with the same name again. This isn't only an issue if you want to use random names but also if you want to scale your workers up and down.

Oh yeah you are totally correct about that use case 👍 some little changes then it looks good for me.

Copy link
Contributor

@alexander-schranz alexander-schranz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@toooni thx for your time and patience. Code looks code from my side need to test it in the evening.

@alexander-schranz
Copy link
Contributor

alexander-schranz commented Jan 22, 2020

@toooni as the xpending response doesnt seems to look the same for me I needed to change how the message timestamp is read. And I needed to disable the claimIdleTime for the integration test as symfony clockmock have no effect on the redis min idle time on xclaim.

diff --git a/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.php b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.php
index 537c3fba68..04a61e0e9c 100644
--- a/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.php
+++ b/src/Symfony/Component/Messenger/Tests/Transport/RedisExt/RedisExtIntegrationTest.php
@@ -32,6 +32,7 @@ class RedisExtIntegrationTest extends TestCase
 
         $this->redis = new \Redis();
         $this->connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), [], $this->redis);
+        $this->connection->disableClaimIdleTime(true);
         $this->connection->cleanup();
         $this->connection->setup();
     }
@@ -104,4 +105,36 @@ class RedisExtIntegrationTest extends TestCase
         $this->assertEquals($body, $encoded['body']);
         $this->assertEquals($headers, $encoded['headers']);
     }
+
+    public function testConnectionGetClaimMessages()
+    {
+        $body = '{"message": "Hi"}';
+        $headers = ['type' => DummyMessage::class];
+
+        $this->connection->add($body, $headers);
+
+        // Read message with other consumer
+        $messages = $this->redis->xreadgroup(
+            $this->connection->getGroup(),
+            'other-consumer2',
+            [$this->connection->getStream() => '>'],
+            1
+        );
+
+        // Queue will not have any messages yet
+        $this->assertNull($this->connection->get());
+
+        // Wait for next claim check
+        sleep(1001);
+
+        // Should still be empty
+        $this->assertNull($this->connection->get());
+
+        // Wait for redelivery timeout
+        sleep(2600);
+
+        $encoded = $this->connection->get();
+        $this->assertEquals($body, $encoded['body']);
+        $this->assertEquals($headers, $encoded['headers']);
+    }
 }
diff --git a/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php b/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php
index fea1bb8cbf..959e6b6a8f 100644
--- a/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php
+++ b/src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php
@@ -48,6 +48,7 @@ class Connection
     private $redeliverTimeout;
     private $nextClaim = 0;
     private $claimInterval;
+    private $disabledClaimIdleTime = false;
     private $couldHavePendingMessages = true;
 
     public function __construct(array $configuration, array $connectionCredentials = [], array $redisOptions = [], \Redis $redis = null)
@@ -74,7 +75,7 @@ class Connection
         $this->queue = $this->stream.'__queue';
         $this->autoSetup = $configuration['auto_setup'] ?? self::DEFAULT_OPTIONS['auto_setup'];
         $this->maxEntries = $configuration['stream_max_entries'] ?? self::DEFAULT_OPTIONS['stream_max_entries'];
-        $this->redeliverTimeout = ($configuration['redeliver_timeout'] ?? self::DEFAULT_OPTIONS['redeliver_timeout']) * 1000;
+        $this->redeliverTimeout = $configuration['redeliver_timeout'] ?? self::DEFAULT_OPTIONS['redeliver_timeout'];
         $this->claimInterval = $configuration['claim_interval'] ?? self::DEFAULT_OPTIONS['claim_interval'];
     }
 
@@ -167,6 +168,7 @@ class Connection
         }
 
         $claimableIds = [];
+
         foreach ($pendingMessages as $pendingMessage) {
             if ($pendingMessage[1] === $this->consumer) {
                 $this->couldHavePendingMessages = true;
@@ -174,18 +176,26 @@ class Connection
                 return;
             }
 
-            if ($pendingMessage[2] < (time() - $this->redeliverTimeout)) {
+            $messageTimestampInSeconds = (int) (intval($pendingMessage[0]) / 1000);
+            if ($messageTimestampInSeconds < (time() - $this->redeliverTimeout)) {
                 $claimableIds[] = $pendingMessage[0];
             }
         }
 
         if (\count($claimableIds) > 0) {
+            $claimIdleTime = ($this->redeliverTimeout * 1000);
+
+            if ($this->disabledClaimIdleTime) {
+                // For tests it needed to disable the idle timeout
+                $claimIdleTime = 0;
+            }
+
             try {
                 $this->connection->xclaim(
                     $this->stream,
                     $this->group,
                     $this->consumer,
-                    $this->redeliverTimeout,
+                    $claimIdleTime,
                     $claimableIds,
                     ['JUSTID']
                 );
@@ -388,9 +398,24 @@ class Connection
         return (int) (microtime(true) * 1000);
     }
 
+    public function disableClaimIdleTime(): void
+    {
+        $this->disabledClaimIdleTime = true;
+    }
+
     public function cleanup(): void
     {
         $this->connection->del($this->stream);
         $this->connection->del($this->queue);
     }
+
+    public function getStream(): string
+    {
+        return $this->stream;
+    }
+
+    public function getGroup(): string
+    {
+        return $this->group;
+    }
 }

@toooni
Copy link
Contributor Author

toooni commented Jan 28, 2020

@chalasr @alexander-schranz I've updated the claim_interval default value to 1 Minute. It doesn't make sense to have it lower if we have a default redeliver_timeout of 1 hour.

@chalasr
Copy link
Member

chalasr commented Feb 2, 2020

they might want to set their own values for these options.

Which would mean that they are using this change as a new feature, since they would need to update their code. Upgrading to patch versions should not involve code changes.

Sorry to insist, but if we are not confident that the default values are good enough for everyone as a bugfix, then this PR should target master.
Inlining the default values for 3.4 and exposing these options as a new feature on master (and document it on master as well) is the right tradeoff IMHO.

@toooni
Copy link
Contributor Author

toooni commented Feb 3, 2020

I will change the PR to target the master branch. But I don't think it makes sense to introduce these changes to 3.4 if there is no way to change the default values.

@toooni toooni changed the base branch from 4.4 to master February 3, 2020 15:25
@toooni toooni changed the title [Messenger] Fix receiving of old pending messages (redis) [Messenger] Add receiving of old pending messages (redis) Feb 3, 2020
@toooni
Copy link
Contributor Author

toooni commented Feb 3, 2020

@chalasr The PR now targets the master branch 👍

@chalasr chalasr modified the milestones: 4.4, next Feb 4, 2020
@chalasr chalasr added Feature and removed Bug labels Feb 4, 2020
@toooni
Copy link
Contributor Author

toooni commented Feb 4, 2020

@chalasr I've rebased again but travisci is still failing because of another master branch issue.

@fabpot
Copy link
Member

fabpot commented Feb 8, 2020

Thank you @toooni.

fabpot added a commit that referenced this pull request Feb 8, 2020
…is) (toooni)

This PR was merged into the 5.1-dev branch.

Discussion
----------

[Messenger] Add receiving of old pending messages (redis)

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| Deprecations? | no
| Tickets       |
| License       | MIT
| Doc PR        | symfony/symfony-docs#12976

This PR makes it possible for the redis transport to get abandoned messages from not running/idle consumers by using `XPENDING` and `XCLAIM`.
Usually it would be best to let the claiming of pending messages be handled by a separate command. Since the messenger component's commands are fixed, we do need to set a `claimTimeout`. The `claimTimeout` defines how long an idle message should be left alone until it will get claimed by the current consumer (Must be a value higher than the longest running handling time of a message or else the message will be handled twice).
Using this solution makes the remarks (symfony/symfony-docs#11869 (review)) regarding not being able to use the hostname as consumer name obsolete. I would even recommend the hostname as the consumer name.

**Questions**

- [x] Which value should we use as default `claimTimeout`?
- [x] How should the `claimTimeout` be configured?
- [x] Feature or Bugfix?

I will create a docs PR and a PR for the other branches as soon as the questions are resolved.

Commits
-------

9cb6fdf Implemted receiving of old pending messages
@fabpot fabpot merged commit 9cb6fdf into symfony:master Feb 8, 2020
chalasr added a commit that referenced this pull request Feb 11, 2020
This PR was merged into the 5.1-dev branch.

Discussion
----------

[Messenger][Redis] Add missing changelog entry

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | no
| Deprecations? | no
| Tickets       | -
| License       | MIT
| Doc PR        | -

Missed in #35384

Commits
-------

08fb0c4 [Messenger][Redis] Add missing changelog entry
@nicolas-grekas nicolas-grekas modified the milestones: next, 5.1 May 4, 2020
@fabpot fabpot mentioned this pull request May 5, 2020
@toooni toooni deleted the claim_abandoned_messages branch July 3, 2020 13:46
@cesurapp
Copy link
Contributor

Is this feature implemented for symfony 4.4?

@alexander-schranz
Copy link
Contributor

alexander-schranz commented Aug 13, 2020

Hello @appaydin,

no this is part of 5.1 as you see linked above in the Release 5.1. But there is also another way to find out in which release a merged pull request is by clicking in github on the merge commit (look for the merge symbol) and then click on the commit id in this case 9cb6fdf:

Bildschirmfoto 2020-08-13 um 10 04 43

And then you see there at the top when you expand the ... in which versions the commit is:

Bildschirmfoto 2020-08-13 um 10 06 34

@cesurapp
Copy link
Contributor

Hello @appaydin,

no this is part of 5.1 as you see linked above in the Release 5.1. But there is also another way to find out in which release a merged pull request is by clicking in github on the merge commit in this case 9cb6fdf:

Bildschirmfoto 2020-08-13 um 10 04 43

And then you see there at the top when you expand the ... in which versions the commit is:

Bildschirmfoto 2020-08-13 um 10 06 34

Ty, It is a pity that it is not for 4.4

@stof
Copy link
Member

stof commented Aug 14, 2020

New features are never backported to patch releases.

@cesurapp
Copy link
Contributor

Yeni özellikler hiçbir zaman yama sürümlerine geri yüklenmez.

My opinion should not be this new version. Because I'm getting this error right now. I have implemented a workaround by increasing the "startsecs" and "startretries" values with Supervisor.

@alexander-schranz
Copy link
Contributor

@appaydin this is a feature for scaling down consumers. You should as redis recommend always reuse consumer names and make sure that your consumer name is unique. Don't use random consumer names thats not recommended by redis as it will bloat the memory usage of redis. Aslong as you don't need to scale down consumers you should not need this feature of receiving messages of other consumers.

wouterj added a commit to symfony/symfony-docs that referenced this pull request Oct 23, 2020
…ooni)

This PR was submitted for the master branch but it was squashed and merged into the 5.1 branch instead.

Discussion
----------

Added redeliver_timeout and claim_interval options

Adds new optional config parameters introduced by symfony/symfony#35384

Commits
-------

8fe20c3 Added redeliver_timeout and claim_interval options
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants
Morty Proxy This is a proxified and sanitized view of the page, visit original site.