-
-
Notifications
You must be signed in to change notification settings - Fork 5.2k
[Messenger] Describe the doctrine transport #10616
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
vincenttouzet
wants to merge
1
commit into
symfony:master
from
vincenttouzet:messenger-doctrine-transport
Closed
[Messenger] Describe the doctrine transport #10616
vincenttouzet
wants to merge
1
commit into
symfony:master
from
vincenttouzet:messenger-doctrine-transport
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
7 tasks
5a19dd2
to
9bbc360
Compare
bca4f9b
to
ec267d1
Compare
symfony-splitter
pushed a commit
to symfony/messenger
that referenced
this pull request
Mar 31, 2019
This PR was merged into the 4.3-dev branch. Discussion ---------- [Messenger] Add a Doctrine transport | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | BC breaks? | no | Deprecations? | no | Tests pass? | yes | Fixed tickets | | License | MIT | Doc PR | symfony/symfony-docs#10616 | DoctrineBundle PR | doctrine/DoctrineBundle#868 As discussed with @sroze at PHPForum in Paris I've worked on adding a Doctrine transport to the Messenger component. Actually `AMQP` is the only supported transport and it could be a good thing to support multiple transports. Having a Doctrine transport could help users to start using the component IMHO (Almost all projects use a database). # How it works The code is splitted betwwen this PR and the one on the DoctrineBundle : doctrine/DoctrineBundle#868 ## Configuration To configure a Doctrine transport the dsn MUST have the format `doctrine://<entity_manager_name>` where `<entity_manager_name>` is the name of the entity manager (usually `default`) ```yml # config/packages/messenger.yaml framework: messenger: transports: my_transport: "doctrine://default?queue=important" ``` ## Table schema Dispatched messages are stored into a database table with the following schema: | Column | Type | Options | Description | |--------------|----------|--------------------------|-------------------------------------------------------------------| | id | bigint | AUTO_INCREMENT, NOT NULL | Primary key | | body | text | NOT NULL | Body of the message | | headers | text | NOT NULL | Headers of the message | | queue | varchar(32) | NOT NULL | Headers of the message | | created_at | datetime | NOT NULL | When the message was inserted onto the table. (automatically set) | | available_at | datetime | NOT NULL | When the message is available to be handled | | delivered_at | datetime | NULL | When the message was delivered to a worker | ## Message dispatching When dispatching a message a new row is inserted into the table. See `Symfony\Component\Messenger\Transport\Doctrine::publish` ## Message consuming The message is retrieved by the `Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceiver`. It calls the `Symfony\Component\Messenger\Transport\Doctrine::get` method to get the next message to handle. ### Getting the next message * Start a transaction * Lock the table to get the first message to handle (The lock is done with the `SELECT ... FOR UPDATE` query) * Update the message in database to update the delivered_at columns * Commit the transaction ### Handling the message The retrieved message is then passed to the handler. If the message is correctly handled the receiver call the `Symfony\Component\Messenger\Transport\Doctrine::ack` which delete the message from the table. If an error occured the receiver call the `Symfony\Component\Messenger\Transport\Doctrine::nack` method which update the message to set the delivered_at column to `null`. ## Message requeueing It may happen that a message is stuck in `delivered` state but the handler does not really handle the message (Database connection error, server crash, ...). To requeue messages the `DoctrineReceiver` call the `Symfony\Component\Messenger\Transport\Doctrine::requeueMessages`. This method update all the message with a `delivered_at` not null since more than the "redeliver timeout" (default to 3600 seconds) # TODO - [x] Add tests - [x] Create DOC PR - [x] PR on doctrine-bundle for transport factory - [x] Add a `available_at` column - [x] Add a `queue` column - [x] Implement the retry functionnality : See #30557 - [x] Rebase after #29476 Commits ------- 88d008c828 [Messenger] Add a Doctrine transport
sroze
added a commit
to symfony/symfony
that referenced
this pull request
Mar 31, 2019
This PR was merged into the 4.3-dev branch. Discussion ---------- [Messenger] Add a Doctrine transport | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | BC breaks? | no | Deprecations? | no | Tests pass? | yes | Fixed tickets | | License | MIT | Doc PR | symfony/symfony-docs#10616 | DoctrineBundle PR | doctrine/DoctrineBundle#868 As discussed with @sroze at PHPForum in Paris I've worked on adding a Doctrine transport to the Messenger component. Actually `AMQP` is the only supported transport and it could be a good thing to support multiple transports. Having a Doctrine transport could help users to start using the component IMHO (Almost all projects use a database). # How it works The code is splitted betwwen this PR and the one on the DoctrineBundle : doctrine/DoctrineBundle#868 ## Configuration To configure a Doctrine transport the dsn MUST have the format `doctrine://<entity_manager_name>` where `<entity_manager_name>` is the name of the entity manager (usually `default`) ```yml # config/packages/messenger.yaml framework: messenger: transports: my_transport: "doctrine://default?queue=important" ``` ## Table schema Dispatched messages are stored into a database table with the following schema: | Column | Type | Options | Description | |--------------|----------|--------------------------|-------------------------------------------------------------------| | id | bigint | AUTO_INCREMENT, NOT NULL | Primary key | | body | text | NOT NULL | Body of the message | | headers | text | NOT NULL | Headers of the message | | queue | varchar(32) | NOT NULL | Headers of the message | | created_at | datetime | NOT NULL | When the message was inserted onto the table. (automatically set) | | available_at | datetime | NOT NULL | When the message is available to be handled | | delivered_at | datetime | NULL | When the message was delivered to a worker | ## Message dispatching When dispatching a message a new row is inserted into the table. See `Symfony\Component\Messenger\Transport\Doctrine::publish` ## Message consuming The message is retrieved by the `Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceiver`. It calls the `Symfony\Component\Messenger\Transport\Doctrine::get` method to get the next message to handle. ### Getting the next message * Start a transaction * Lock the table to get the first message to handle (The lock is done with the `SELECT ... FOR UPDATE` query) * Update the message in database to update the delivered_at columns * Commit the transaction ### Handling the message The retrieved message is then passed to the handler. If the message is correctly handled the receiver call the `Symfony\Component\Messenger\Transport\Doctrine::ack` which delete the message from the table. If an error occured the receiver call the `Symfony\Component\Messenger\Transport\Doctrine::nack` method which update the message to set the delivered_at column to `null`. ## Message requeueing It may happen that a message is stuck in `delivered` state but the handler does not really handle the message (Database connection error, server crash, ...). To requeue messages the `DoctrineReceiver` call the `Symfony\Component\Messenger\Transport\Doctrine::requeueMessages`. This method update all the message with a `delivered_at` not null since more than the "redeliver timeout" (default to 3600 seconds) # TODO - [x] Add tests - [x] Create DOC PR - [x] PR on doctrine-bundle for transport factory - [x] Add a `available_at` column - [x] Add a `queue` column - [x] Implement the retry functionnality : See #30557 - [x] Rebase after #29476 Commits ------- 88d008c [Messenger] Add a Doctrine transport
ec267d1
to
4f4ac88
Compare
Should we create for each transport a own documentation file? In this case we should also create a documentation file about the amqp implementation. /cc @sroze #EU-FOSSA |
OskarStark
requested changes
Apr 7, 2019
4f4ac88
to
5d9686c
Compare
90f9fd8
to
322955d
Compare
322955d
to
4e65148
Compare
Thank you Vincent for this great feature and its docs! |
symfony-splitter
pushed a commit
to symfony/messenger
that referenced
this pull request
Jan 28, 2020
This PR was merged into the 4.3-dev branch. Discussion ---------- [Messenger] Add a Doctrine transport | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | BC breaks? | no | Deprecations? | no | Tests pass? | yes | Fixed tickets | | License | MIT | Doc PR | symfony/symfony-docs#10616 | DoctrineBundle PR | doctrine/DoctrineBundle#868 As discussed with @sroze at PHPForum in Paris I've worked on adding a Doctrine transport to the Messenger component. Actually `AMQP` is the only supported transport and it could be a good thing to support multiple transports. Having a Doctrine transport could help users to start using the component IMHO (Almost all projects use a database). # How it works The code is splitted betwwen this PR and the one on the DoctrineBundle : doctrine/DoctrineBundle#868 ## Configuration To configure a Doctrine transport the dsn MUST have the format `doctrine://<entity_manager_name>` where `<entity_manager_name>` is the name of the entity manager (usually `default`) ```yml # config/packages/messenger.yaml framework: messenger: transports: my_transport: "doctrine://default?queue=important" ``` ## Table schema Dispatched messages are stored into a database table with the following schema: | Column | Type | Options | Description | |--------------|----------|--------------------------|-------------------------------------------------------------------| | id | bigint | AUTO_INCREMENT, NOT NULL | Primary key | | body | text | NOT NULL | Body of the message | | headers | text | NOT NULL | Headers of the message | | queue | varchar(32) | NOT NULL | Headers of the message | | created_at | datetime | NOT NULL | When the message was inserted onto the table. (automatically set) | | available_at | datetime | NOT NULL | When the message is available to be handled | | delivered_at | datetime | NULL | When the message was delivered to a worker | ## Message dispatching When dispatching a message a new row is inserted into the table. See `Symfony\Component\Messenger\Transport\Doctrine::publish` ## Message consuming The message is retrieved by the `Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceiver`. It calls the `Symfony\Component\Messenger\Transport\Doctrine::get` method to get the next message to handle. ### Getting the next message * Start a transaction * Lock the table to get the first message to handle (The lock is done with the `SELECT ... FOR UPDATE` query) * Update the message in database to update the delivered_at columns * Commit the transaction ### Handling the message The retrieved message is then passed to the handler. If the message is correctly handled the receiver call the `Symfony\Component\Messenger\Transport\Doctrine::ack` which delete the message from the table. If an error occured the receiver call the `Symfony\Component\Messenger\Transport\Doctrine::nack` method which update the message to set the delivered_at column to `null`. ## Message requeueing It may happen that a message is stuck in `delivered` state but the handler does not really handle the message (Database connection error, server crash, ...). To requeue messages the `DoctrineReceiver` call the `Symfony\Component\Messenger\Transport\Doctrine::requeueMessages`. This method update all the message with a `delivered_at` not null since more than the "redeliver timeout" (default to 3600 seconds) # TODO - [x] Add tests - [x] Create DOC PR - [x] PR on doctrine-bundle for transport factory - [x] Add a `available_at` column - [x] Add a `queue` column - [x] Implement the retry functionnality : See #30557 - [x] Rebase after #29476 Commits ------- 88d008c828 [Messenger] Add a Doctrine transport
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Add documentation relative to the PR symfony/symfony#29007
TODO