Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 49fcf13

Browse filesBrowse files
committed
Introduced a SimplerWorker
1 parent d30830c commit 49fcf13
Copy full SHA for 49fcf13

File tree

8 files changed

+744
-216
lines changed
Filter options

8 files changed

+744
-216
lines changed

‎src/Symfony/Component/Amqp/Broker.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Amqp/Broker.php
+60-148Lines changed: 60 additions & 148 deletions
Large diffs are not rendered by default.
+57Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Amqp;
13+
14+
// WIP
15+
16+
interface BrokerInterface
17+
{
18+
public function connect();
19+
20+
public function disconnect();
21+
22+
public function isConnected(): bool;
23+
24+
public function getConnection(): \AMQPConnection;
25+
26+
public function getChannel(): \AMQPChannel;
27+
28+
public function publish(string $routingKey, string $message, array $attributes = array()): bool;
29+
30+
public function move(\AMQPEnvelope $msg, string $routingKey, array $attributes = array()): bool;
31+
32+
public function moveToDeadLetter(\AMQPEnvelope $msg, array $attributes = array()): bool;
33+
34+
public function consume(string $queueName, callable $callback = null, int $flags = \AMQP_NOPARAM, string $consumerTag = null);
35+
36+
public function get(string $queueName, int $flags = \AMQP_NOPARAM);
37+
38+
public function ack(\AMQPEnvelope $msg, int $flags = \AMQP_NOPARAM, string $queueName = null): bool;
39+
40+
public function nack(\AMQPEnvelope $msg, int $flags = \AMQP_NOPARAM, string $queueName = null): bool;
41+
42+
public function addQueue(\AmqpQueue $queue);
43+
44+
public function hasQueue(string $queueName);
45+
46+
public function getQueue(string $queueName);
47+
48+
public function setQueues(array $queues);
49+
50+
public function addExchange(\AmqpExchange $exchange);
51+
52+
public function hasExchange(string $exchangeName);
53+
54+
public function getExchange(string $exchangeName);
55+
56+
public function setExchanges(array $exchanges);
57+
}
+19Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Amqp\Exception;
13+
14+
/**
15+
* @author Grégoire Pineau <lyrixx@lyrixx.info>
16+
*/
17+
class UndefinedExchangeException extends LogicException implements ExceptionInterface
18+
{
19+
}
+19Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Amqp\Exception;
13+
14+
/**
15+
* @author Grégoire Pineau <lyrixx@lyrixx.info>
16+
*/
17+
class UndefinedQueueException extends LogicException implements ExceptionInterface
18+
{
19+
}

‎src/Symfony/Component/Amqp/Queue.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Amqp/Queue.php
+2-2Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,9 @@ public function declareAndBind()
116116
{
117117
$this->declareQueue();
118118

119-
foreach ($this->bindings as $exchange => $configs) {
119+
foreach ($this->bindings as $exchangeName => $configs) {
120120
foreach ($configs as $config) {
121-
parent::bind($exchange, $config['routing_key'], $config['bind_arguments']);
121+
parent::bind($exchangeName, $config['routing_key'], $config['bind_arguments']);
122122
}
123123
}
124124
}
+256Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Amqp;
13+
14+
use Symfony\Component\Amqp\Exception\UndefinedExchangeException;
15+
use Symfony\Component\Amqp\Exception\UndefinedQueueException;
16+
17+
/**
18+
* @author Fabien Potencier <fabien@symfony.com>
19+
* @author Grégoire Pineau <lyrixx@lyrixx.info>
20+
*/
21+
class SimpleBroker
22+
{
23+
const DEFAULT_EXCHANGE = 'symfony.default';
24+
const DEAD_LETTER_EXCHANGE = 'symfony.dead_letter';
25+
const RETRY_EXCHANGE = 'symfony.retry';
26+
27+
private $connection;
28+
private $channel;
29+
private $queues = array();
30+
private $exchanges = array();
31+
32+
public function __construct(\AMQPConnection $connection, array $queues = array(), array $exchanges = array())
33+
{
34+
$this->connection = $connection;
35+
$this->connection->setReadTimeout(4 * 60 * 60);
36+
$this->setQueues($queues);
37+
$this->setExchanges($exchanges);
38+
}
39+
40+
public static function createWithDsn(string $dsn = 'amqp://guest:guest@localhost:5672/', array $queues, array $exchanges): self
41+
{
42+
$connection = new \AMQPConnection(DsnParser::parseDsn($dsn));
43+
44+
return new self($connection, $queues, $exchanges);
45+
}
46+
47+
public function connect()
48+
{
49+
if (!$this->connection->isConnected()) {
50+
$this->connection->connect();
51+
}
52+
53+
if (!$this->channel) {
54+
$this->channel = new \AMQPChannel($this->connection);
55+
}
56+
}
57+
58+
public function disconnect()
59+
{
60+
$this->channel = null;
61+
62+
if ($this->connection->isConnected()) {
63+
$this->connection->disconnect();
64+
}
65+
}
66+
67+
public function isConnected(): bool
68+
{
69+
return $this->connection->isConnected();
70+
}
71+
72+
public function getConnection(): \AMQPConnection
73+
{
74+
return $this->connection;
75+
}
76+
77+
public function getChannel(): \AMQPChannel
78+
{
79+
if (null === $this->channel) {
80+
$this->connect();
81+
}
82+
83+
return $this->channel;
84+
}
85+
86+
/**
87+
* Publishes a new message.
88+
*
89+
* Special attributes:
90+
*
91+
* * flags: if set, will be used during the Exchange::publish call
92+
* * exchange: The exchange name to use ("symfony.default" by default)
93+
*/
94+
public function publish(string $routingKey, string $message, array $attributes = array()): bool
95+
{
96+
$flags = $attributes['flags'] ?? \AMQP_MANDATORY;
97+
unset($attributes['flags']);
98+
99+
$exchangeName = $attributes['exchange'] ?? self::DEFAULT_EXCHANGE;
100+
unset($attributes['exchange']);
101+
102+
return $this->getExchange($exchangeName)->publish($message, $routingKey, $flags, $attributes);
103+
}
104+
105+
/**
106+
* Moves a message to a given route.
107+
*
108+
* If attributes are given as third argument they will override the
109+
* message ones.
110+
*/
111+
public function move(\AMQPEnvelope $msg, string $routingKey, array $attributes = array()): bool
112+
{
113+
static $map = array(
114+
'app_id' => 'getAppId',
115+
'content_encoding' => 'getContentEncoding',
116+
'content_type' => 'getContentType',
117+
'delivery_mode' => 'getDeliveryMode',
118+
'expiration' => 'getExpiration',
119+
'headers' => 'getHeaders',
120+
'message_id' => 'getMessageId',
121+
'priority' => 'getPriority',
122+
'reply_to' => 'getReplyTo',
123+
'timestamp' => 'getTimestamp',
124+
'type' => 'getType',
125+
'user_id' => 'getUserId',
126+
'exchange' => null,
127+
);
128+
129+
$originalAttributes = array();
130+
131+
foreach ($map as $key => $method) {
132+
if (isset($attributes[$key])) {
133+
$originalAttributes[$key] = $attributes[$key];
134+
135+
continue;
136+
}
137+
138+
if (!$method) {
139+
continue;
140+
}
141+
142+
$value = $msg->{$method}();
143+
if ($value) {
144+
$originalAttributes[$key] = $value;
145+
}
146+
}
147+
148+
return $this->publish($routingKey, $msg->getBody(), $originalAttributes);
149+
}
150+
151+
public function moveToDeadLetter(\AMQPEnvelope $msg, array $attributes = array()): bool
152+
{
153+
return $this->move($msg, $msg->getRoutingKey().'.dead', $attributes);
154+
}
155+
156+
public function consume(string $queueName, callable $callback = null, int $flags = \AMQP_NOPARAM, string $consumerTag = null)
157+
{
158+
$this->getQueue($queueName)->consume($callback, $flags, $consumerTag);
159+
}
160+
161+
/**
162+
* Gets an Envelope from a Queue by its given name.
163+
*
164+
* @return \AMQPEnvelope|bool An enveloppe or false
165+
*/
166+
public function get(string $queueName, int $flags = \AMQP_NOPARAM)
167+
{
168+
return $this->getQueue($queueName)->get($flags);
169+
}
170+
171+
/**
172+
* Ack a message.
173+
*
174+
* WARNING: This shortcut only works when using the conventions
175+
* where the queue and the routing queue have the same name.
176+
*
177+
* If it's not the case, you MUST specify the queueName.
178+
*/
179+
public function ack(\AMQPEnvelope $msg, int $flags = \AMQP_NOPARAM, string $queueName = null): bool
180+
{
181+
$queue = $this->getQueue($queueName ?: $msg->getRoutingKey());
182+
183+
return $queue->ack($msg->getDeliveryTag(), $flags);
184+
}
185+
186+
/**
187+
* Nack a message.
188+
*
189+
* WARNING: This shortcut only works when using the conventions
190+
* where the queue and the routing queue have the same name.
191+
*
192+
* If it's not the case, you MUST specify the queueName.
193+
*/
194+
public function nack(\AMQPEnvelope $msg, int $flags = \AMQP_NOPARAM, string $queueName = null): bool
195+
{
196+
$queue = $this->getQueue($queueName ?: $msg->getRoutingKey());
197+
198+
return $queue->nack($msg->getDeliveryTag(), $flags);
199+
}
200+
201+
public function addQueue(\AmqpQueue $queue)
202+
{
203+
$this->queues[$queue->getName()] = $queue;
204+
}
205+
206+
public function hasQueue(string $queueName)
207+
{
208+
return (bool) ($this->queues[$queueName] ?? false);
209+
}
210+
211+
public function getQueue(string $queueName)
212+
{
213+
$queue = $this->queues[$queueName] ?? false;
214+
215+
if (!$queue) {
216+
throw new UndefinedQueueException(sprintf('The queue "%s" does not exist.', $queueName));
217+
}
218+
219+
return $queue;
220+
}
221+
222+
public function setQueues(array $queues)
223+
{
224+
foreach ($queues as $queue) {
225+
$this->addQueue($queue);
226+
}
227+
}
228+
229+
public function addExchange(\AmqpExchange $exchange)
230+
{
231+
$this->exchanges[$exchange->getName()] = $exchange;
232+
}
233+
234+
public function hasExchange(string $exchangeName)
235+
{
236+
return (bool) ($this->exchanges[$exchangeName] ?? false);
237+
}
238+
239+
public function getExchange(string $exchangeName)
240+
{
241+
$exchange = $this->exchanges[$exchangeName] ?? false;
242+
243+
if (!$exchange) {
244+
throw new UndefinedExchangeException(sprintf('The exchange "%s" does not exist.', $exchangeName));
245+
}
246+
247+
return $exchange;
248+
}
249+
250+
public function setExchanges(array $exchanges)
251+
{
252+
foreach ($exchanges as $exchange) {
253+
$this->addExchange($exchange);
254+
}
255+
}
256+
}

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.