Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 9d43404

Browse filesBrowse files
committed
Added two new components: AMQP and Worker
1 parent 06da874 commit 9d43404
Copy full SHA for 9d43404

File tree

76 files changed

+5404
-1
lines changed
Filter options

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Dismiss banner

76 files changed

+5404
-1
lines changed

‎.travis.yml

Copy file name to clipboardExpand all lines: .travis.yml
+4Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ env:
1616
global:
1717
- MIN_PHP=5.5.9
1818
- SYMFONY_PROCESS_PHP_TEST_BINARY=~/.phpenv/versions/5.6/bin/php
19+
- RABBITMQ_URL=amqp://guest:guest@localhost:5672/
1920

2021
matrix:
2122
include:
@@ -41,6 +42,7 @@ services:
4142
- memcached
4243
- mongodb
4344
- redis-server
45+
- rabbitmq
4446

4547
before_install:
4648
- |
@@ -82,6 +84,7 @@ before_install:
8284
echo apc.enable_cli = 1 >> $INI
8385
echo extension = ldap.so >> $INI
8486
echo extension = redis.so >> $INI
87+
echo extension = amqp.so >> $INI
8588
echo extension = memcached.so >> $INI
8689
[[ $PHP = 5.* ]] && echo extension = memcache.so >> $INI
8790
if [[ $PHP = 5.* ]]; then
@@ -159,6 +162,7 @@ install:
159162
160163
- if [[ ! $skip ]]; then $COMPOSER_UP; fi
161164
- if [[ ! $skip ]]; then ./phpunit install; fi
165+
- src/Symfony/Component/Amqp/bin/reset.php force
162166
- |
163167
# phpinfo
164168
if [[ ! $PHP = hhvm* ]]; then php -i; else hhvm --php -r 'print_r($_SERVER);print_r(ini_get_all());'; fi

‎composer.json

Copy file name to clipboardExpand all lines: composer.json
+2Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
"symfony/polyfill-util": "~1.0"
3333
},
3434
"replace": {
35+
"symfony/amqp": "self.version",
3536
"symfony/asset": "self.version",
3637
"symfony/browser-kit": "self.version",
3738
"symfony/cache": "self.version",
@@ -81,6 +82,7 @@
8182
"symfony/web-link": "self.version",
8283
"symfony/web-profiler-bundle": "self.version",
8384
"symfony/web-server-bundle": "self.version",
85+
"symfony/worker": "self.version",
8486
"symfony/workflow": "self.version",
8587
"symfony/yaml": "self.version"
8688
},

‎phpunit.xml.dist

Copy file name to clipboardExpand all lines: phpunit.xml.dist
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
<env name="LDAP_PORT" value="3389" />
2020
<env name="REDIS_HOST" value="localhost" />
2121
<env name="MEMCACHED_HOST" value="localhost" />
22+
<env name="RABBITMQ_URL" value="amqp://guest:guest@localhost:5672/sensiolabs_amqp" />
2223
</php>
2324

2425
<testsuites>
+44Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
<?php
2+
3+
namespace Symfony\Bundle\FrameworkBundle\Command;
4+
5+
use Symfony\Component\Console\Input\InputArgument;
6+
use Symfony\Component\Console\Input\InputInterface;
7+
use Symfony\Component\Console\Output\OutputInterface;
8+
9+
class WorkerAmqpMoveCommand extends ContainerAwareCommand
10+
{
11+
private $broker;
12+
private $logger;
13+
14+
protected function configure()
15+
{
16+
$this
17+
->setName('worker:amqp:move')
18+
->setDescription('Take all messages from a queue, and send them to the default exchange with a new routing key.')
19+
->setDefinition(array(
20+
new InputArgument('from', InputArgument::REQUIRED, 'The queue.'),
21+
new InputArgument('to', InputArgument::REQUIRED, 'The new routing key.'),
22+
))
23+
;
24+
}
25+
26+
protected function initialize(InputInterface $input, OutputInterface $output)
27+
{
28+
$this->broker = $this->getContainer()->get('amqp.broker');
29+
$this->logger = $this->getContainer()->get('logger');
30+
}
31+
32+
protected function execute(InputInterface $input, OutputInterface $output)
33+
{
34+
$from = $input->getArgument('from');
35+
$to = $input->getArgument('to');
36+
37+
while (false !== $message = $this->broker->get($from)) {
38+
$this->logger->info('Move a message...');
39+
$this->broker->move($message, $to);
40+
$this->broker->ack($message);
41+
$this->logger->debug('...message moved.');
42+
}
43+
}
44+
}
+26Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
<?php
2+
3+
namespace Symfony\Bundle\FrameworkBundle\Command;
4+
5+
use Symfony\Component\Console\Input\InputInterface;
6+
use Symfony\Component\Console\Output\OutputInterface;
7+
8+
class WorkerListCommand extends ContainerAwareCommand
9+
{
10+
protected function configure()
11+
{
12+
$this
13+
->setName('worker:list')
14+
->setDescription('List available workers.')
15+
;
16+
}
17+
18+
protected function execute(InputInterface $input, OutputInterface $output)
19+
{
20+
$workers = $this->getContainer()->getParameter('worker.workers');
21+
22+
foreach ($workers as $name => $_) {
23+
$output->writeln($name);
24+
}
25+
}
26+
}
+66Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
<?php
2+
3+
namespace Symfony\Bundle\FrameworkBundle\Command;
4+
5+
use Symfony\Component\Amqp\Worker\ConfigurableLoopInterface;
6+
use Symfony\Component\Console\Input\InputArgument;
7+
use Symfony\Component\Console\Input\InputInterface;
8+
use Symfony\Component\Console\Input\InputOption;
9+
use Symfony\Component\Console\Output\OutputInterface;
10+
11+
class WorkerRunCommand extends ContainerAwareCommand
12+
{
13+
protected function configure()
14+
{
15+
$this
16+
->setName('worker:run')
17+
->setDescription('Run a worker')
18+
->setDefinition(array(
19+
new InputArgument('worker', InputArgument::REQUIRED, 'The worker'),
20+
new InputOption('name', null, InputOption::VALUE_REQUIRED, 'A name, useful for stats/monitoring. Defaults to worker name.'),
21+
))
22+
;
23+
}
24+
25+
protected function execute(InputInterface $input, OutputInterface $output)
26+
{
27+
$loop = $this->getLoop($input);
28+
29+
$loopName = $input->getOption('name') ?: $loop->getName();
30+
31+
if ($loop instanceof ConfigurableLoopInterface) {
32+
$loop->setName($loopName);
33+
}
34+
35+
$processName = sprintf('%s_%s', $this->getContainer()->getParameter('worker.cli_title_prefix'), $loopName);
36+
37+
// On OSX, it may raise an error:
38+
// Warning: cli_set_process_title(): cli_set_process_title had an error: Not initialized correctly
39+
@cli_set_process_title($processName);
40+
41+
pcntl_signal(SIGTERM, function () use ($loop) {
42+
$loop->stop('Signaled with SIGTERM.');
43+
});
44+
pcntl_signal(SIGINT, function () use ($loop) {
45+
$loop->stop('Signaled with SIGINT.');
46+
});
47+
48+
$loop->run();
49+
}
50+
51+
private function getLoop(InputInterface $input)
52+
{
53+
$workers = $this->getContainer()->getParameter('worker.workers');
54+
55+
$workerName = $input->getArgument('worker');
56+
57+
if (!array_key_exists($workerName, $workers)) {
58+
throw new \InvalidArgumentException(sprintf(
59+
'The worker "%s" does not exist. Available ones are: "%s".',
60+
$workerName, implode('", "', array_keys($workers))
61+
));
62+
}
63+
64+
return $this->getContainer()->get($workers[$workerName]);
65+
}
66+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.