Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

[Messenger] Add TransportInterface as first class citizen sender+receiver #27164

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Messenger\Transport\SenderInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
use Symfony\Component\PropertyAccess\PropertyAccessor;
use Symfony\Component\PropertyInfo\PropertyAccessExtractorInterface;
use Symfony\Component\PropertyInfo\PropertyDescriptionExtractorInterface;
Expand Down Expand Up @@ -1506,19 +1507,13 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
throw new LogicException('The default AMQP transport is not available. Make sure you have installed and enabled the Serializer component. Try enable it or install it by running "composer require symfony/serializer-pack".');
}

$senderDefinition = (new Definition(SenderInterface::class))
->setFactory(array(new Reference('messenger.transport_factory'), 'createSender'))
->setArguments(array($transport['dsn'], $transport['options']))
->addTag('messenger.sender', array('name' => $name))
;
$container->setDefinition('messenger.sender.'.$name, $senderDefinition);

$receiverDefinition = (new Definition(ReceiverInterface::class))
->setFactory(array(new Reference('messenger.transport_factory'), 'createReceiver'))
$transportDefinition = (new Definition(TransportInterface::class))
->setFactory(array(new Reference('messenger.transport_factory'), 'createTransport'))
->setArguments(array($transport['dsn'], $transport['options']))
->addTag('messenger.receiver', array('name' => $name))
->addTag('messenger.sender', array('name' => $name))
;
$container->setDefinition('messenger.receiver.'.$name, $receiverDefinition);
$container->setDefinition('messenger.transport.'.$name, $transportDefinition);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,30 +533,20 @@ public function testMessenger()
public function testMessengerTransports()
{
$container = $this->createContainerFromFile('messenger_transports');
$this->assertTrue($container->hasDefinition('messenger.sender.default'));
$this->assertTrue($container->getDefinition('messenger.sender.default')->hasTag('messenger.sender'));
$this->assertEquals(array(array('name' => 'default')), $container->getDefinition('messenger.sender.default')->getTag('messenger.sender'));
$this->assertTrue($container->hasDefinition('messenger.receiver.default'));
$this->assertTrue($container->getDefinition('messenger.receiver.default')->hasTag('messenger.receiver'));
$this->assertEquals(array(array('name' => 'default')), $container->getDefinition('messenger.receiver.default')->getTag('messenger.receiver'));

$this->assertTrue($container->hasDefinition('messenger.sender.customised'));
$senderFactory = $container->getDefinition('messenger.sender.customised')->getFactory();
$senderArguments = $container->getDefinition('messenger.sender.customised')->getArguments();

$this->assertEquals(array(new Reference('messenger.transport_factory'), 'createSender'), $senderFactory);
$this->assertCount(2, $senderArguments);
$this->assertSame('amqp://localhost/%2f/messages?exchange_name=exchange_name', $senderArguments[0]);
$this->assertSame(array('queue' => array('name' => 'Queue')), $senderArguments[1]);

$this->assertTrue($container->hasDefinition('messenger.receiver.customised'));
$receiverFactory = $container->getDefinition('messenger.receiver.customised')->getFactory();
$receiverArguments = $container->getDefinition('messenger.receiver.customised')->getArguments();

$this->assertEquals(array(new Reference('messenger.transport_factory'), 'createReceiver'), $receiverFactory);
$this->assertCount(2, $receiverArguments);
$this->assertSame('amqp://localhost/%2f/messages?exchange_name=exchange_name', $receiverArguments[0]);
$this->assertSame(array('queue' => array('name' => 'Queue')), $receiverArguments[1]);
$this->assertTrue($container->hasDefinition('messenger.transport.default'));
$this->assertTrue($container->getDefinition('messenger.transport.default')->hasTag('messenger.receiver'));
$this->assertTrue($container->getDefinition('messenger.transport.default')->hasTag('messenger.sender'));
$this->assertEquals(array(array('name' => 'default')), $container->getDefinition('messenger.transport.default')->getTag('messenger.receiver'));
$this->assertEquals(array(array('name' => 'default')), $container->getDefinition('messenger.transport.default')->getTag('messenger.sender'));

$this->assertTrue($container->hasDefinition('messenger.transport.customised'));
$transportFactory = $container->getDefinition('messenger.transport.customised')->getFactory();
$transportArguments = $container->getDefinition('messenger.transport.customised')->getArguments();

$this->assertEquals(array(new Reference('messenger.transport_factory'), 'createTransport'), $transportFactory);
$this->assertCount(2, $transportArguments);
$this->assertSame('amqp://localhost/%2f/messages?exchange_name=exchange_name', $transportArguments[0]);
$this->assertSame(array('queue' => array('name' => 'Queue')), $transportArguments[1]);

$this->assertTrue($container->hasDefinition('messenger.transport.amqp.factory'));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public function __construct(EncoderInterface $messageEncoder, Connection $connec
/**
* {@inheritdoc}
*/
public function send($message)
public function send($message): void
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m not fond of the idea of enforcing void as a result. Senders might return something later on, we shouldn’t close that door IMHO.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing void should be ok for the bc policy. See symfony/symfony-docs#9717

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough.

{
$encodedMessage = $this->messageEncoder->encode($message);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Transport\AmqpExt;

use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface;
use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;

/**
* @author Nicolas Grekas <p@tchwork.com>
*/
class AmqpTransport implements TransportInterface
{
private $encoder;
private $decoder;
private $dsn;
private $options;
private $debug;
private $connection;
private $receiver;
private $sender;

public function __construct(EncoderInterface $encoder, DecoderInterface $decoder, string $dsn, array $options, bool $debug)
{
$this->encoder = $encoder;
$this->decoder = $decoder;
$this->dsn = $dsn;
$this->options = $options;
$this->debug = $debug;
}

/**
* {@inheritdoc}
*/
public function receive(callable $handler): void
{
($this->receiver ?? $this->getReceiver())->receive($hander);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems ternary operator fits better here (same below) ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what you mean, doesn't feel like to me :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @yceruto mean next variant:

$this->receiver ? $this->receiver->receive($hander) ? $this->getReceiver()->receive($hander)

Copy link
Member

@yceruto yceruto May 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant ($this->receiver ?: $this->getReceiver())-> instead, as $this->receiver is always defined I don't see the need for ??, but probably I'm missing something here :)

Copy link
Contributor

@ogizanagi ogizanagi May 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

$this->receiver is null until $this->getReceiver() is called a first time. So makes sense to use the null-coalescing operator rather than ternary for this :)

Copy link
Member

@yceruto yceruto May 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the manual:

The null coalescing operator (??) has been added as syntactic sugar for the common case of needing to use a ternary in conjunction with isset(). It returns its first operand if IT EXISTS and is not NULL; otherwise it returns its second operand.

So ternary operator fits better in this case as $this->receiver always exists (i.e. this var is defined always), but NVM it's just a detail, ?? works fine too.

}

/**
* {@inheritdoc}
*/
public function stop(): void
{
($this->receiver ?? $this->getReceiver())->stop();
}

/**
* {@inheritdoc}
*/
public function send($message): void
{
($this->sender ?? $this->getSender())->send($message);
}

private function getReceiver()
{
return $this->receiver = new AmqpReceiver($this->decoder, $this->connection ?? $this->getConnection());
}

private function getSender()
{
return $this->sender = new AmqpSender($this->encoder, $this->connection ?? $this->getConnection());
}

private function getConnection()
{
return $this->connection = new Connection($this->dsn, $this->options, $this->debug);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@

namespace Symfony\Component\Messenger\Transport\AmqpExt;

use Symfony\Component\Messenger\Transport\Factory\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Messenger\Transport\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\DecoderInterface;
use Symfony\Component\Messenger\Transport\Serialization\EncoderInterface;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;

/**
* @author Samuel Roze <samuel.roze@gmail.com>
Expand All @@ -33,14 +32,9 @@ public function __construct(EncoderInterface $encoder, DecoderInterface $decoder
$this->debug = $debug;
}

public function createReceiver(string $dsn, array $options): ReceiverInterface
public function createTransport(string $dsn, array $options): TransportInterface
{
return new AmqpReceiver($this->decoder, Connection::fromDsn($dsn, $options, $this->debug));
}

public function createSender(string $dsn, array $options): SenderInterface
{
return new AmqpSender($this->encoder, Connection::fromDsn($dsn, $options, $this->debug));
return new AmqpTransport($this->encoder, $this->decoder, $dsn, $options, $thid->debug);
}

public function supports(string $dsn, array $options): bool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ interface SenderInterface
*
* @param object $message
*/
public function send($message);
public function send($message): void;
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,12 @@
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Transport\Factory;

use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Messenger\Transport\SenderInterface;
namespace Symfony\Component\Messenger\Transport;

/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class ChainTransportFactory implements TransportFactoryInterface
class TransportFactory implements TransportFactoryInterface
{
private $factories;

Expand All @@ -29,22 +26,11 @@ public function __construct(iterable $factories)
$this->factories = $factories;
}

public function createReceiver(string $dsn, array $options): ReceiverInterface
{
foreach ($this->factories as $factory) {
if ($factory->supports($dsn, $options)) {
return $factory->createReceiver($dsn, $options);
}
}

throw new \InvalidArgumentException(sprintf('No transport supports the given DSN "%s".', $dsn));
}

public function createSender(string $dsn, array $options): SenderInterface
public function createTransport(string $dsn, array $options): TransportInterface
{
foreach ($this->factories as $factory) {
if ($factory->supports($dsn, $options)) {
return $factory->createSender($dsn, $options);
return $factory->createTransport($dsn, $options);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Transport\Factory;

use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Messenger\Transport\SenderInterface;
namespace Symfony\Component\Messenger\Transport;

/**
* Creates a Messenger transport.
Expand Down
21 changes: 21 additions & 0 deletions 21 src/Symfony/Component/Messenger/Transport/TransportInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Transport;

/**
* @author Nicolas Grekas <p@tchwork.com>
*
* @experimental in 4.1
*/
interface TransportInterface extends ReceiverInterface, SenderInterface
{
}
Morty Proxy This is a proxified and sanitized view of the page, visit original site.