Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit a10725b

Browse filesBrowse files
committed
Add standalone setup broker feature.
1 parent dc0d34d commit a10725b
Copy full SHA for a10725b

File tree

8 files changed

+187
-0
lines changed
Filter options

8 files changed

+187
-0
lines changed

‎pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php

Copy file name to clipboardExpand all lines: pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ public function load(array $configs, ContainerBuilder $container): void
5959
$transportFactory->buildContext($container, []);
6060
$transportFactory->buildQueueConsumer($container, $modules['consumption']);
6161
$transportFactory->buildRpcClient($container, []);
62+
$transportFactory->buildSetupBroker($container, []);
6263

6364
// client
6465
if (isset($modules['client'])) {

‎pkg/enqueue-bundle/EnqueueBundle.php

Copy file name to clipboardExpand all lines: pkg/enqueue-bundle/EnqueueBundle.php
+2Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
use Enqueue\Symfony\Client\DependencyInjection\BuildTopicSubscriberRoutesPass as BuildClientTopicSubscriberRoutesPass;
1515
use Enqueue\Symfony\DependencyInjection\BuildConsumptionExtensionsPass;
1616
use Enqueue\Symfony\DependencyInjection\BuildProcessorRegistryPass;
17+
use Enqueue\Symfony\DependencyInjection\BuildSetupBrokerPass;
1718
use Symfony\Component\DependencyInjection\Compiler\PassConfig;
1819
use Symfony\Component\DependencyInjection\ContainerBuilder;
1920
use Symfony\Component\HttpKernel\Bundle\Bundle;
@@ -25,6 +26,7 @@ public function build(ContainerBuilder $container): void
2526
//transport passes
2627
$container->addCompilerPass(new BuildConsumptionExtensionsPass());
2728
$container->addCompilerPass(new BuildProcessorRegistryPass());
29+
$container->addCompilerPass(new BuildSetupBrokerPass());
2830

2931
//client passes
3032
$container->addCompilerPass(new BuildClientConsumptionExtensionsPass());

‎pkg/enqueue-bundle/Resources/config/services.yml

Copy file name to clipboardExpand all lines: pkg/enqueue-bundle/Resources/config/services.yml
+9Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,15 @@ services:
1515
tags:
1616
- { name: 'console.command' }
1717

18+
enqueue.transport.setup_broker:
19+
class: 'Enqueue\Symfony\SetupBrokerCommand'
20+
arguments:
21+
- '@enqueue.locator'
22+
- '%enqueue.default_transport%'
23+
- 'enqueue.transport.%s.setup_broker'
24+
tags:
25+
- { name: 'console.command' }
26+
1827
enqueue.client.consume_command:
1928
class: 'Enqueue\Symfony\Client\ConsumeCommand'
2029
arguments:
+33Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
<?php
2+
namespace Enqueue\SetupBroker;
3+
4+
use Psr\Log\LoggerInterface;
5+
6+
class ChainSetupBroker implements SetupBrokerInterface
7+
{
8+
/**
9+
* @var SetupBrokerInterface[]
10+
*/
11+
private $setupBrokers;
12+
13+
/**
14+
* @param SetupBrokerInterface[] $setupBrokers
15+
*/
16+
public function __construct(array $setupBrokers)
17+
{
18+
$this->setupBrokers = [];
19+
20+
array_walk($setupBrokers, function (SetupBrokerInterface $setupBroker) {
21+
$this->setupBrokers[] = $setupBroker;
22+
});
23+
}
24+
25+
public function setupBroker(LoggerInterface $logger = null): void
26+
{
27+
foreach ($this->setupBrokers as $setupBroker) {
28+
$logger->info(get_class($setupBroker));
29+
30+
$setupBroker->setupBroker($logger);
31+
}
32+
}
33+
}
+9Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
<?php
2+
namespace Enqueue\SetupBroker;
3+
4+
use Psr\Log\LoggerInterface;
5+
6+
interface SetupBrokerInterface
7+
{
8+
public function setupBroker(LoggerInterface $logger = null): void;
9+
}
+47Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
<?php
2+
3+
namespace Enqueue\Symfony\DependencyInjection;
4+
5+
use Enqueue\Symfony\DiUtils;
6+
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
7+
use Symfony\Component\DependencyInjection\ContainerBuilder;
8+
use Symfony\Component\DependencyInjection\Reference;
9+
10+
final class BuildSetupBrokerPass implements CompilerPassInterface
11+
{
12+
public function process(ContainerBuilder $container): void
13+
{
14+
if (false == $container->hasParameter('enqueue.transports')) {
15+
throw new \LogicException('The "enqueue.transports" parameter must be set.');
16+
}
17+
18+
$names = $container->getParameter('enqueue.transports');
19+
$defaultName = $container->getParameter('enqueue.default_transport');
20+
21+
foreach ($names as $name) {
22+
$diUtils = DiUtils::create(TransportFactory::MODULE, $name);
23+
24+
$setupBrokerId = $diUtils->format('setup_broker');
25+
if (false == $container->hasDefinition($setupBrokerId)) {
26+
throw new \LogicException(sprintf('Service "%s" not found', $setupBrokerId));
27+
}
28+
29+
$tag = 'enqueue.transport.setup_broker';
30+
$map = [];
31+
foreach ($container->findTaggedServiceIds($tag) as $serviceId => $tagAttributes) {
32+
foreach ($tagAttributes as $tagAttribute) {
33+
$transport = $tagAttribute['transport'] ?? $defaultName;
34+
35+
if ($transport !== $name && 'all' !== $transport) {
36+
continue;
37+
}
38+
39+
$map[] = new Reference($serviceId);
40+
}
41+
}
42+
43+
$setupBroker = $container->getDefinition($setupBrokerId);
44+
$setupBroker->setArgument(0, $map);
45+
}
46+
}
47+
}

‎pkg/enqueue/Symfony/DependencyInjection/TransportFactory.php

Copy file name to clipboardExpand all lines: pkg/enqueue/Symfony/DependencyInjection/TransportFactory.php
+10Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
use Enqueue\Resources;
1212
use Enqueue\Rpc\RpcClient;
1313
use Enqueue\Rpc\RpcFactory;
14+
use Enqueue\SetupBroker\ChainSetupBroker;
1415
use Enqueue\Symfony\ContainerProcessorRegistry;
1516
use Enqueue\Symfony\DiUtils;
1617
use Interop\Queue\ConnectionFactory;
@@ -243,6 +244,15 @@ public function buildRpcClient(ContainerBuilder $container, array $config): void
243244
}
244245
}
245246

247+
public function buildSetupBroker(ContainerBuilder $container, array $config): void
248+
{
249+
$container->register($this->diUtils->format('setup_broker'), ChainSetupBroker::class)
250+
->addArgument([])
251+
;
252+
253+
$this->addServiceToLocator($container, 'setup_broker');
254+
}
255+
246256
private function assertServiceExists(ContainerBuilder $container, string $serviceId): void
247257
{
248258
if (false == $container->hasDefinition($serviceId)) {
+76Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
<?php
2+
3+
namespace Enqueue\Symfony;
4+
5+
use Enqueue\Client\DriverInterface;
6+
use Enqueue\SetupBroker\SetupBrokerInterface;
7+
use Psr\Container\ContainerInterface;
8+
use Psr\Container\NotFoundExceptionInterface;
9+
use Symfony\Component\Console\Command\Command;
10+
use Symfony\Component\Console\Input\InputInterface;
11+
use Symfony\Component\Console\Input\InputOption;
12+
use Symfony\Component\Console\Logger\ConsoleLogger;
13+
use Symfony\Component\Console\Output\OutputInterface;
14+
15+
class SetupBrokerCommand extends Command
16+
{
17+
protected static $defaultName = 'enqueue:transport:setup-broker';
18+
19+
/**
20+
* @var ContainerInterface
21+
*/
22+
private $container;
23+
24+
/**
25+
* @var string
26+
*/
27+
private $defaultTransport;
28+
29+
/**
30+
* @var string
31+
*/
32+
private $setupBrokerIdPattern;
33+
34+
public function __construct(ContainerInterface $container, string $defaultTransport, string $setupBrokerIdPattern = 'enqueue.transport.%s.setup_broker')
35+
{
36+
$this->container = $container;
37+
$this->defaultTransport = $defaultTransport;
38+
$this->setupBrokerIdPattern = $setupBrokerIdPattern;
39+
40+
parent::__construct(static::$defaultName);
41+
}
42+
43+
protected function configure(): void
44+
{
45+
$this
46+
->setDescription('Setup broker. Configure the broker, creates queues, topics and so on.')
47+
->addOption('transport', 't', InputOption::VALUE_OPTIONAL, 'The transport to consume messages from.', $this->defaultTransport)
48+
;
49+
}
50+
51+
protected function execute(InputInterface $input, OutputInterface $output): ?int
52+
{
53+
$transport = $input->getOption('transport');
54+
55+
$logger = new ConsoleLogger($output);
56+
57+
try {
58+
$setupBroker = $this->getSetupBroker($transport);
59+
60+
$logger->info(get_class($setupBroker));
61+
62+
$setupBroker->setupBroker();
63+
} catch (NotFoundExceptionInterface $e) {
64+
throw new \LogicException(sprintf('Transport "%s" is not supported.', $transport), null, $e);
65+
}
66+
67+
$output->writeln('Broker set up');
68+
69+
return null;
70+
}
71+
72+
private function getSetupBroker(string $transport): SetupBrokerInterface
73+
{
74+
return $this->container->get(sprintf($this->setupBrokerIdPattern, $transport));
75+
}
76+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.