Description
Description
I want to add a message queue de-duplication that is based on a message content hash for Doctrine Transport since SQL databases have a natural way to do it, which is by using the INSERT INTO ... ON DUPLICATE KEY
statement (and analogues where supported).
While using the lock component to lock and release either with a custom middleware or inside handlers themselves is something you could do, there are some significant performance considerations for doctrine transports that require adding at least one table, running additional queries and there are race condition considerations that are hard to overcome, especially when we are starting to consider high load scenarios running many workers in parallel spread out over many servers.
RabbitMQ and Amazon SQS have built-in queue de-duplication that can be used (didn't find any mention for beanstalkd).
Redis has a big warning that you can't run more than one consumer per specific combo of stream, group and consumer, so parallel processing of messages seems to not be an issue due to how Redis works for queues - consumers always run in sequence.
My idea for Doctrine based transports is pretty simple - we add a nullable hash field to messenger_message
table with a unique index on it with default hash implementation being: $hash = hash('whatever we pick, md5?', sprintf('%s-%s-%s', $queueName, get_class($messageObject), json_serialize($messageObject)))
. With the on duplicate, since we are trying to insert same body for the same queue of the same message type, we just do ON DUPLICATE body = body
since the body is identical, so we don't need to change it. Definetelly works for MySQL, MariaDB and PosgtreSQL. I don't really have any experience with Oracle or MSSQL, so for this I would like to ask people in the know to chip in.
Why is this good?
- Nullable field with default being
NULL
makes it optional, so you can configure deduplication based on queue or even message type if we want to implement it like that. - We are not adding any additional locks, especially if it's a multi-server consumer environment with a single queue server, which removes needing to setup anything and get it out of the box when enabled at least for Doctrine Transport
- We add minimal processing to the hot path - the serializer already is creating the JSON to insert into the database, so ideally it's just hash calculation with some simple logic.
- It's high throughput friendly since database handles conflicts (forgoing contention issues with locks).
- Long-running or delayed messages do not have issues with long TTL times (for example, I have in my system messages that can be delayed for up to 10 days to be delivered, maintaining locks via Lock component is not going to be great, especially when I have a run rate of 200-300 messages a second).
As long as the message sits in the queue in the table, it gets deduplicated automatically until it's processed whatever it's run time is without relying on any 3rd party systems.
And we can provide via a stamp DeduplicationKeyStamp
for people to use their own logic for key combos - if they want message type-based deduplication instead of message type + body + queue name - easy to implement. Could even just do a callback.
I was told that this probably would end up being a more general implementation in abstract layer, I'm fine if it goes that way, but I see problems that some transports afford far more efficient ways to do it than others - SQL and RabbitMQ having deduplication in the software itself, Doctrine Transport can leverage SQL features, but things like Redis and Beanstalkd definitely will have to relly on external locks (lock component). I'm not sure there's a good way to make it non-transport-specific, or at least each transport will have it's own implementation under the hood with some generic deduplication stamp. But I was thinking more that being configuration level thing on message type declaration maybe even or queue based setting (maybe just apply a stamp automatically via config options so you don't have to add it manually on every dispatch in the code).
At least for Doctrine Transport, I'm available to do the implementation myself (contribute some of my company's time for it)
Example
No response