Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Added AMQP component #27140

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Added AMQP component
  • Loading branch information
lyrixx committed May 7, 2018
commit 389556d2ac3a257db7300116933c92bca9e48075
3 changes: 2 additions & 1 deletion 3 .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ env:
global:
- MIN_PHP=7.1.3
- SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/shims/php
- MESSENGER_AMQP_DSN=amqp://localhost/%2f/messages
- MESSENGER_AMQP_DSN=amqp://localhost/
- AMQP_DSN=amqp://localhost/

matrix:
include:
Expand Down
1 change: 1 addition & 0 deletions 1 composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"symfony/polyfill-php72": "~1.5"
},
"replace": {
"symfony/amqp": "self.version",
"symfony/asset": "self.version",
"symfony/browser-kit": "self.version",
"symfony/cache": "self.version",
Expand Down
1 change: 1 addition & 0 deletions 1 phpunit.xml.dist
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
<env name="LDAP_PORT" value="3389" />
<env name="REDIS_HOST" value="localhost" />
<env name="MEMCACHED_HOST" value="localhost" />
<env name="AMQP_DSN" value="amqp://localhost/symfony_test_amqp" />
</php>

<testsuites>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public function getConfigTreeBuilder()
$this->addWebLinkSection($rootNode);
$this->addLockSection($rootNode);
$this->addMessengerSection($rootNode);
$this->addAmqpSection($rootNode);

return $treeBuilder;
}
Expand Down Expand Up @@ -1071,4 +1072,152 @@ function ($a) {
->end()
;
}

private function addAmqpSection($rootNode)
{
$rootNode
->children()
->arrayNode('amqp')
->fixXmlConfig('connection')
->children()
->arrayNode('connections')
->addDefaultChildrenIfNoneSet('default')
->useAttributeAsKey('name')
->prototype('array')
->fixXmlConfig('exchange')
->fixXmlConfig('queue')
->children()
->scalarNode('name')
->cannotBeEmpty()
->end()
->scalarNode('dsn')
->cannotBeEmpty()
->defaultValue('amqp://guest:guest@localhost:5672/symfony')
->end()
->arrayNode('exchanges')
->prototype('array')
->fixXmlConfig('argument')
->children()
->scalarNode('name')
->isRequired()
->cannotBeEmpty()
->end()
->variableNode('arguments')
->defaultValue(array())
// Deal with XML config
->beforeNormalization()
->always()
->then(function ($v) {
return $this->fixXmlArguments($v);
})
->end()
->validate()
->ifTrue(function ($v) {
return !is_array($v);
})
->thenInvalid('Arguments must be an array (got %s).')
->end()
->end()
->end()
->end()
->end()
->arrayNode('queues')
->prototype('array')
->fixXmlConfig('argument')
->children()
->scalarNode('name')
->isRequired()
->cannotBeEmpty()
->end()
->variableNode('arguments')
->defaultValue(array())
// Deal with XML config
->beforeNormalization()
->always()
->then(function ($v) {
return $this->fixXmlArguments($v);
})
->end()
->validate()
->ifTrue(function ($v) {
return !is_array($v);
})
->thenInvalid('Arguments must be an array (got %s).')
->end()
->end()
->enumNode('retry_strategy')
->values(array(null, 'constant', 'exponential'))
->defaultNull()
->end()
->variableNode('retry_strategy_options')
->validate()
->ifTrue(function ($v) {
return !is_array($v);
})
->thenInvalid('Arguments must be an array (got %s).')
->end()
->end()
->arrayNode('thresholds')
->addDefaultsIfNotSet()
->children()
->integerNode('warning')->defaultNull()->end()
->integerNode('critical')->defaultNull()->end()
->end()
->end()
->end()
->validate()
->ifTrue(function ($config) {
return 'constant' === $config['retry_strategy'] && !array_key_exists('max', $config['retry_strategy_options']);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of these, can't we explicitly set the retry strategy options in retry_strategy.constant and retry_strategy.exponential? The only thing to validate after is that we only have one retry strategy.

})
->thenInvalid('"max" of "retry_strategy_options" must be set for constant retry strategy.')
->end()
->validate()
->ifTrue(function ($config) {
return 'constant' === $config['retry_strategy'] && !array_key_exists('time', $config['retry_strategy_options']);
})
->thenInvalid('"time" of "retry_strategy_options" must be set for constant retry strategy.')
->end()
->validate()
->ifTrue(function ($config) {
return 'exponential' === $config['retry_strategy'] && !array_key_exists('max', $config['retry_strategy_options']);
})
->thenInvalid('"max" of "retry_strategy_options" must be set for exponential retry strategy.')
->end()
->validate()
->ifTrue(function ($config) {
return 'exponential' === $config['retry_strategy'] && !array_key_exists('offset', $config['retry_strategy_options']);
})
->thenInvalid('"offset" of "retry_strategy_options" must be set for exponential retry strategy.')
->end()
->end()
->end()
->end()
->end()
->end()
->scalarNode('default_connection')
->cannotBeEmpty()
->end()
->end()
->end()
->end()
;
}

private function fixXmlArguments($v)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's exactly fixed by this method? Can't you simply have an XSD that allows any kind of data like the rest of the "free options"? 🤔

{
if (!is_array($v)) {
return $v;
}

$tmp = array();

foreach ($v as $key => $value) {
if (!isset($value['key']) && !isset($value['value'])) {
return $v;
}
$tmp[$value['key']] = $value['value'];
}

return $tmp;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use Symfony\Bundle\FrameworkBundle\Controller\Controller;
use Symfony\Bundle\FrameworkBundle\Routing\AnnotatedRouteControllerLoader;
use Symfony\Bundle\FullStack;
use Symfony\Component\Amqp\Broker;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alpha order

use Symfony\Component\Cache\Adapter\AbstractAdapter;
use Symfony\Component\Cache\Adapter\AdapterInterface;
use Symfony\Component\Cache\Adapter\ArrayAdapter;
Expand Down Expand Up @@ -251,6 +252,9 @@ public function load(array $configs, ContainerBuilder $container)
$this->registerRouterConfiguration($config['router'], $container, $loader);
$this->registerAnnotationsConfiguration($config['annotations'], $container, $loader);
$this->registerPropertyAccessConfiguration($config['property_access'], $container, $loader);
if (isset($config['amqp'])) {
$this->registerAmqpConfiguration($config['amqp'], $container, $loader);
}

if ($this->isConfigEnabled($container, $config['serializer'])) {
if (!class_exists('Symfony\Component\Serializer\Serializer')) {
Expand Down Expand Up @@ -1218,6 +1222,40 @@ private function registerPropertyAccessConfiguration(array $config, ContainerBui
;
}

private function registerAmqpConfiguration(array $config, ContainerBuilder $container, XmlFileLoader $loader)
{
$loader->load('amqp.xml');

$defaultConnectionName = $config['default_connection'] ?? null;

$match = false;
foreach ($config['connections'] as $name => $connection) {
$container
->register("amqp.broker.$name", Broker::class)
->setFactory(array(Broker::class, 'createWithDsn'))
->addArgument($connection['dsn'])
->addArgument($connection['queues'])
->addArgument($connection['exchanges'])
;

if (!$defaultConnectionName) {
$defaultConnectionName = $name;
}
if ($defaultConnectionName === $name) {
$match = true;
}
}

if (!$match) {
throw new \InvalidArgumentException(sprintf('The "framework.amqp.default_connection" option "%s" does not exist.', $defaultConnectionName));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"The default connection configured ("%s") does not exist." ?

}

$container
->setAlias('amqp.broker', "amqp.broker.$defaultConnectionName")
->setPublic(true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

really?
how about an alias for autowiring also?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's quite common to publish a message from a controller or a command
About the alias: What should I alias ? the Broker::class to the default connection ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What should I alias ? the Broker::class to the default connection ?

Yep 👍

;
}

private function registerSecurityCsrfConfiguration(array $config, ContainerBuilder $container, XmlFileLoader $loader)
{
if (!$this->isConfigEnabled($container, $config)) {
Expand Down
17 changes: 17 additions & 0 deletions 17 src/Symfony/Bundle/FrameworkBundle/Resources/config/amqp.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?xml version="1.0" ?>

<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://symfony.com/schema/dic/services http://symfony.com/schema/dic/services/services-1.0.xsd">

<services>
<defaults public="false" />

<service id="amqp.command.move" class="Symfony\Component\Amqp\Command\AmqpMoveCommand">
<argument type="service" id="amqp.broker" />
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should use a connection locator here I guess, in case there are multiple connections defined.

<argument type="service" id="logger" on-invalid="null" />
<tag name="console.command" command="amqp:move" />
</service>

</services>
</container>
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
<xsd:element name="php-errors" type="php-errors" minOccurs="0" maxOccurs="1" />
<xsd:element name="lock" type="lock" minOccurs="0" maxOccurs="1" />
<xsd:element name="messenger" type="messenger" minOccurs="0" maxOccurs="1" />
<xsd:element name="amqp" type="amqp" minOccurs="0" maxOccurs="1" />
</xsd:choice>

<xsd:attribute name="http-method-override" type="xsd:boolean" />
Expand Down Expand Up @@ -396,4 +397,50 @@
<xsd:attribute name="name" type="xsd:string" use="required"/>
<xsd:attribute name="default-middleware" type="xsd:boolean"/>
</xsd:complexType>

<xsd:complexType name="amqp">
<xsd:choice maxOccurs="unbounded">
<xsd:element name="connection" type="amqp_connection" minOccurs="0" maxOccurs="unbounded" />
</xsd:choice>

<xsd:attribute name="default_connection" type="xsd:string" />
</xsd:complexType>

<xsd:complexType name="amqp_connection">
<xsd:choice maxOccurs="unbounded">
<xsd:element name="queue" type="amqp_queue" minOccurs="0" maxOccurs="unbounded" />
<xsd:element name="exchange" type="amqp_exchange" minOccurs="0" maxOccurs="unbounded" />
</xsd:choice>

<xsd:attribute name="dsn" type="xsd:string" />
<xsd:attribute name="name" type="xsd:string" />
</xsd:complexType>

<xsd:complexType name="amqp_queue">
<xsd:choice maxOccurs="unbounded">
<xsd:element name="argument" type="amqp_argument" minOccurs="0" maxOccurs="unbounded" />
<xsd:element name="retry-strategy-options" type="amqp_queue_retry_strategy_options" minOccurs="0" maxOccurs="1" />
</xsd:choice>
<xsd:attribute name="name" type="xsd:string" />
<xsd:attribute name="retry-strategy" type="xsd:string" />
</xsd:complexType>

<xsd:complexType name="amqp_argument">
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd use metadata instead of this type. See messenger_transport.options

<xsd:attribute name="key" type="xsd:string" />
<xsd:attribute name="value" type="xsd:string" />
</xsd:complexType>

<xsd:complexType name="amqp_queue_retry_strategy_options">
<xsd:attribute name="key" type="xsd:string" />
<xsd:attribute name="offset" type="xsd:string" />
<xsd:attribute name="max" type="xsd:string" />
<xsd:attribute name="time" type="xsd:string" />
</xsd:complexType>

<xsd:complexType name="amqp_exchange">
<xsd:choice maxOccurs="unbounded">
<xsd:element name="argument" type="amqp_argument" minOccurs="0" maxOccurs="unbounded" />
</xsd:choice>
<xsd:attribute name="name" type="xsd:string" />
</xsd:complexType>
</xsd:schema>
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<?php

$container->loadFromExtension('framework', array(
'amqp' => array(
),
));
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?php

$container->loadFromExtension('framework', array(
'amqp' => array(
'connections' => array(
'queue_staging' => array(
'dsn' => 'amqp://foo:baz@rabbitmq:1234/staging',
),
'queue_prod' => array(
'dsn' => 'amqp://foo:bar@rabbitmq:1234/prod',
'queues' => array(
array(
'name' => 'retry_strategy_exponential',
'retry_strategy' => 'exponential',
'retry_strategy_options' => array('offset' => 1, 'max' => 3),
),
array(
'name' => 'arguments',
'arguments' => array(
'routing_keys' => 'my_routing_key',
'flags' => 2,
),
),
),
'exchanges' => array(
array(
'name' => 'headers',
'arguments' => array(
'type' => 'headers',
),
),
),
),
),
'default_connection' => 'queue_prod',
),
));
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?xml version="1.0" ?>

<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:framework="http://symfony.com/schema/dic/symfony"
xsi:schemaLocation="http://symfony.com/schema/dic/services http://symfony.com/schema/dic/services/services-1.0.xsd
http://symfony.com/schema/dic/symfony http://symfony.com/schema/dic/symfony/symfony-1.0.xsd">

<framework:config>
<framework:amqp>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/> should be enough.

</framework:amqp>
</framework:config>
</container>
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.