-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
Added AMQP component #27140
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added AMQP component #27140
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -104,6 +104,7 @@ public function getConfigTreeBuilder() | |
$this->addWebLinkSection($rootNode); | ||
$this->addLockSection($rootNode); | ||
$this->addMessengerSection($rootNode); | ||
$this->addAmqpSection($rootNode); | ||
|
||
return $treeBuilder; | ||
} | ||
|
@@ -1071,4 +1072,152 @@ function ($a) { | |
->end() | ||
; | ||
} | ||
|
||
private function addAmqpSection($rootNode) | ||
{ | ||
$rootNode | ||
->children() | ||
->arrayNode('amqp') | ||
->fixXmlConfig('connection') | ||
->children() | ||
->arrayNode('connections') | ||
->addDefaultChildrenIfNoneSet('default') | ||
->useAttributeAsKey('name') | ||
->prototype('array') | ||
->fixXmlConfig('exchange') | ||
->fixXmlConfig('queue') | ||
->children() | ||
->scalarNode('name') | ||
->cannotBeEmpty() | ||
->end() | ||
->scalarNode('dsn') | ||
->cannotBeEmpty() | ||
->defaultValue('amqp://guest:guest@localhost:5672/symfony') | ||
->end() | ||
->arrayNode('exchanges') | ||
->prototype('array') | ||
->fixXmlConfig('argument') | ||
->children() | ||
->scalarNode('name') | ||
->isRequired() | ||
->cannotBeEmpty() | ||
->end() | ||
->variableNode('arguments') | ||
->defaultValue(array()) | ||
// Deal with XML config | ||
->beforeNormalization() | ||
->always() | ||
->then(function ($v) { | ||
return $this->fixXmlArguments($v); | ||
}) | ||
->end() | ||
->validate() | ||
->ifTrue(function ($v) { | ||
return !is_array($v); | ||
}) | ||
->thenInvalid('Arguments must be an array (got %s).') | ||
->end() | ||
->end() | ||
->end() | ||
->end() | ||
->end() | ||
->arrayNode('queues') | ||
->prototype('array') | ||
->fixXmlConfig('argument') | ||
->children() | ||
->scalarNode('name') | ||
->isRequired() | ||
->cannotBeEmpty() | ||
->end() | ||
->variableNode('arguments') | ||
->defaultValue(array()) | ||
// Deal with XML config | ||
->beforeNormalization() | ||
->always() | ||
->then(function ($v) { | ||
return $this->fixXmlArguments($v); | ||
}) | ||
->end() | ||
->validate() | ||
->ifTrue(function ($v) { | ||
return !is_array($v); | ||
}) | ||
->thenInvalid('Arguments must be an array (got %s).') | ||
->end() | ||
->end() | ||
->enumNode('retry_strategy') | ||
->values(array(null, 'constant', 'exponential')) | ||
->defaultNull() | ||
->end() | ||
->variableNode('retry_strategy_options') | ||
->validate() | ||
->ifTrue(function ($v) { | ||
return !is_array($v); | ||
}) | ||
->thenInvalid('Arguments must be an array (got %s).') | ||
->end() | ||
->end() | ||
->arrayNode('thresholds') | ||
->addDefaultsIfNotSet() | ||
->children() | ||
->integerNode('warning')->defaultNull()->end() | ||
->integerNode('critical')->defaultNull()->end() | ||
->end() | ||
->end() | ||
->end() | ||
->validate() | ||
->ifTrue(function ($config) { | ||
return 'constant' === $config['retry_strategy'] && !array_key_exists('max', $config['retry_strategy_options']); | ||
}) | ||
->thenInvalid('"max" of "retry_strategy_options" must be set for constant retry strategy.') | ||
->end() | ||
->validate() | ||
->ifTrue(function ($config) { | ||
return 'constant' === $config['retry_strategy'] && !array_key_exists('time', $config['retry_strategy_options']); | ||
}) | ||
->thenInvalid('"time" of "retry_strategy_options" must be set for constant retry strategy.') | ||
->end() | ||
->validate() | ||
->ifTrue(function ($config) { | ||
return 'exponential' === $config['retry_strategy'] && !array_key_exists('max', $config['retry_strategy_options']); | ||
}) | ||
->thenInvalid('"max" of "retry_strategy_options" must be set for exponential retry strategy.') | ||
->end() | ||
->validate() | ||
->ifTrue(function ($config) { | ||
return 'exponential' === $config['retry_strategy'] && !array_key_exists('offset', $config['retry_strategy_options']); | ||
}) | ||
->thenInvalid('"offset" of "retry_strategy_options" must be set for exponential retry strategy.') | ||
->end() | ||
->end() | ||
->end() | ||
->end() | ||
->end() | ||
->end() | ||
->scalarNode('default_connection') | ||
->cannotBeEmpty() | ||
->end() | ||
->end() | ||
->end() | ||
->end() | ||
; | ||
} | ||
|
||
private function fixXmlArguments($v) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's exactly fixed by this method? Can't you simply have an XSD that allows any kind of data like the rest of the "free options"? 🤔 |
||
{ | ||
if (!is_array($v)) { | ||
return $v; | ||
} | ||
|
||
$tmp = array(); | ||
|
||
foreach ($v as $key => $value) { | ||
if (!isset($value['key']) && !isset($value['value'])) { | ||
return $v; | ||
} | ||
$tmp[$value['key']] = $value['value']; | ||
} | ||
|
||
return $tmp; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ | |
use Symfony\Bundle\FrameworkBundle\Controller\Controller; | ||
use Symfony\Bundle\FrameworkBundle\Routing\AnnotatedRouteControllerLoader; | ||
use Symfony\Bundle\FullStack; | ||
use Symfony\Component\Amqp\Broker; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. alpha order |
||
use Symfony\Component\Cache\Adapter\AbstractAdapter; | ||
use Symfony\Component\Cache\Adapter\AdapterInterface; | ||
use Symfony\Component\Cache\Adapter\ArrayAdapter; | ||
|
@@ -251,6 +252,9 @@ public function load(array $configs, ContainerBuilder $container) | |
$this->registerRouterConfiguration($config['router'], $container, $loader); | ||
$this->registerAnnotationsConfiguration($config['annotations'], $container, $loader); | ||
$this->registerPropertyAccessConfiguration($config['property_access'], $container, $loader); | ||
if (isset($config['amqp'])) { | ||
$this->registerAmqpConfiguration($config['amqp'], $container, $loader); | ||
} | ||
|
||
if ($this->isConfigEnabled($container, $config['serializer'])) { | ||
if (!class_exists('Symfony\Component\Serializer\Serializer')) { | ||
|
@@ -1218,6 +1222,40 @@ private function registerPropertyAccessConfiguration(array $config, ContainerBui | |
; | ||
} | ||
|
||
private function registerAmqpConfiguration(array $config, ContainerBuilder $container, XmlFileLoader $loader) | ||
{ | ||
$loader->load('amqp.xml'); | ||
|
||
$defaultConnectionName = $config['default_connection'] ?? null; | ||
|
||
$match = false; | ||
foreach ($config['connections'] as $name => $connection) { | ||
$container | ||
->register("amqp.broker.$name", Broker::class) | ||
->setFactory(array(Broker::class, 'createWithDsn')) | ||
->addArgument($connection['dsn']) | ||
->addArgument($connection['queues']) | ||
->addArgument($connection['exchanges']) | ||
; | ||
|
||
if (!$defaultConnectionName) { | ||
$defaultConnectionName = $name; | ||
} | ||
if ($defaultConnectionName === $name) { | ||
$match = true; | ||
} | ||
} | ||
|
||
if (!$match) { | ||
throw new \InvalidArgumentException(sprintf('The "framework.amqp.default_connection" option "%s" does not exist.', $defaultConnectionName)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "The default connection configured ("%s") does not exist." ? |
||
} | ||
|
||
$container | ||
->setAlias('amqp.broker', "amqp.broker.$defaultConnectionName") | ||
->setPublic(true) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. really? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's quite common to publish a message from a controller or a command There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yep 👍 |
||
; | ||
} | ||
|
||
private function registerSecurityCsrfConfiguration(array $config, ContainerBuilder $container, XmlFileLoader $loader) | ||
{ | ||
if (!$this->isConfigEnabled($container, $config)) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
<?xml version="1.0" ?> | ||
|
||
<container xmlns="http://symfony.com/schema/dic/services" | ||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://symfony.com/schema/dic/services http://symfony.com/schema/dic/services/services-1.0.xsd"> | ||
|
||
<services> | ||
<defaults public="false" /> | ||
|
||
<service id="amqp.command.move" class="Symfony\Component\Amqp\Command\AmqpMoveCommand"> | ||
<argument type="service" id="amqp.broker" /> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You should use a connection locator here I guess, in case there are multiple connections defined. |
||
<argument type="service" id="logger" on-invalid="null" /> | ||
<tag name="console.command" command="amqp:move" /> | ||
</service> | ||
|
||
</services> | ||
</container> |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,6 +32,7 @@ | |
<xsd:element name="php-errors" type="php-errors" minOccurs="0" maxOccurs="1" /> | ||
<xsd:element name="lock" type="lock" minOccurs="0" maxOccurs="1" /> | ||
<xsd:element name="messenger" type="messenger" minOccurs="0" maxOccurs="1" /> | ||
<xsd:element name="amqp" type="amqp" minOccurs="0" maxOccurs="1" /> | ||
</xsd:choice> | ||
|
||
<xsd:attribute name="http-method-override" type="xsd:boolean" /> | ||
|
@@ -396,4 +397,50 @@ | |
<xsd:attribute name="name" type="xsd:string" use="required"/> | ||
<xsd:attribute name="default-middleware" type="xsd:boolean"/> | ||
</xsd:complexType> | ||
|
||
<xsd:complexType name="amqp"> | ||
<xsd:choice maxOccurs="unbounded"> | ||
<xsd:element name="connection" type="amqp_connection" minOccurs="0" maxOccurs="unbounded" /> | ||
</xsd:choice> | ||
|
||
<xsd:attribute name="default_connection" type="xsd:string" /> | ||
</xsd:complexType> | ||
|
||
<xsd:complexType name="amqp_connection"> | ||
<xsd:choice maxOccurs="unbounded"> | ||
<xsd:element name="queue" type="amqp_queue" minOccurs="0" maxOccurs="unbounded" /> | ||
<xsd:element name="exchange" type="amqp_exchange" minOccurs="0" maxOccurs="unbounded" /> | ||
</xsd:choice> | ||
|
||
<xsd:attribute name="dsn" type="xsd:string" /> | ||
<xsd:attribute name="name" type="xsd:string" /> | ||
</xsd:complexType> | ||
|
||
<xsd:complexType name="amqp_queue"> | ||
<xsd:choice maxOccurs="unbounded"> | ||
<xsd:element name="argument" type="amqp_argument" minOccurs="0" maxOccurs="unbounded" /> | ||
<xsd:element name="retry-strategy-options" type="amqp_queue_retry_strategy_options" minOccurs="0" maxOccurs="1" /> | ||
</xsd:choice> | ||
<xsd:attribute name="name" type="xsd:string" /> | ||
<xsd:attribute name="retry-strategy" type="xsd:string" /> | ||
</xsd:complexType> | ||
|
||
<xsd:complexType name="amqp_argument"> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd use |
||
<xsd:attribute name="key" type="xsd:string" /> | ||
<xsd:attribute name="value" type="xsd:string" /> | ||
</xsd:complexType> | ||
|
||
<xsd:complexType name="amqp_queue_retry_strategy_options"> | ||
<xsd:attribute name="key" type="xsd:string" /> | ||
<xsd:attribute name="offset" type="xsd:string" /> | ||
<xsd:attribute name="max" type="xsd:string" /> | ||
<xsd:attribute name="time" type="xsd:string" /> | ||
</xsd:complexType> | ||
|
||
<xsd:complexType name="amqp_exchange"> | ||
<xsd:choice maxOccurs="unbounded"> | ||
<xsd:element name="argument" type="amqp_argument" minOccurs="0" maxOccurs="unbounded" /> | ||
</xsd:choice> | ||
<xsd:attribute name="name" type="xsd:string" /> | ||
</xsd:complexType> | ||
</xsd:schema> |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
<?php | ||
|
||
$container->loadFromExtension('framework', array( | ||
'amqp' => array( | ||
), | ||
)); |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
<?php | ||
|
||
$container->loadFromExtension('framework', array( | ||
'amqp' => array( | ||
'connections' => array( | ||
'queue_staging' => array( | ||
'dsn' => 'amqp://foo:baz@rabbitmq:1234/staging', | ||
), | ||
'queue_prod' => array( | ||
'dsn' => 'amqp://foo:bar@rabbitmq:1234/prod', | ||
'queues' => array( | ||
array( | ||
'name' => 'retry_strategy_exponential', | ||
'retry_strategy' => 'exponential', | ||
'retry_strategy_options' => array('offset' => 1, 'max' => 3), | ||
), | ||
array( | ||
'name' => 'arguments', | ||
'arguments' => array( | ||
'routing_keys' => 'my_routing_key', | ||
'flags' => 2, | ||
), | ||
), | ||
), | ||
'exchanges' => array( | ||
array( | ||
'name' => 'headers', | ||
'arguments' => array( | ||
'type' => 'headers', | ||
), | ||
), | ||
), | ||
), | ||
), | ||
'default_connection' => 'queue_prod', | ||
), | ||
)); |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
<?xml version="1.0" ?> | ||
|
||
<container xmlns="http://symfony.com/schema/dic/services" | ||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xmlns:framework="http://symfony.com/schema/dic/symfony" | ||
xsi:schemaLocation="http://symfony.com/schema/dic/services http://symfony.com/schema/dic/services/services-1.0.xsd | ||
http://symfony.com/schema/dic/symfony http://symfony.com/schema/dic/symfony/symfony-1.0.xsd"> | ||
|
||
<framework:config> | ||
<framework:amqp> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
</framework:amqp> | ||
</framework:config> | ||
</container> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of these, can't we explicitly set the retry strategy options in
retry_strategy.constant
andretry_strategy.exponential
? The only thing to validate after is that we only have one retry strategy.