Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

[Messenger]  Allow InMemoryTransport to serialize message #39075

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions 5 src/Symfony/Component/Messenger/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
CHANGELOG
=========

5.3.0
-----

* `InMemoryTransport` can perform message serialization through dsn `in-memory://?serialize=true`.

5.2.0
-----

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,35 @@ public function testCreateTransport()
$this->assertInstanceOf(InMemoryTransport::class, $this->factory->createTransport('in-memory://', [], $serializer));
}

public function testCreateTransportWithoutSerializer()
{
/** @var SerializerInterface $serializer */
$serializer = $this->createMock(SerializerInterface::class);
$serializer
->expects($this->never())
->method('encode')
;
$transport = $this->factory->createTransport('in-memory://?serialize=false', [], $serializer);
$message = Envelope::wrap(new DummyMessage('Hello.'));
$transport->send($message);

$this->assertSame([$message], $transport->get());
}

public function testCreateTransportWithSerializer()
{
/** @var SerializerInterface $serializer */
$serializer = $this->createMock(SerializerInterface::class);
$message = Envelope::wrap(new DummyMessage('Hello.'));
$serializer
->expects($this->once())
->method('encode')
->with($this->equalTo($message))
;
$transport = $this->factory->createTransport('in-memory://?serialize=true', [], $serializer);
$transport->send($message);
}

public function testResetCreatedTransports()
{
$transport = $this->factory->createTransport('in-memory://', [], $this->createMock(SerializerInterface::class));
Expand All @@ -63,6 +92,8 @@ public function provideDSN(): array
{
return [
'Supported' => ['in-memory://foo'],
'Serialize enabled' => ['in-memory://?serialize=true'],
'Serialize disabled' => ['in-memory://?serialize=false'],
'Unsupported' => ['amqp://bar', false],
];
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
use PHPUnit\Framework\TestCase;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Tests\Fixtures\AnEnvelopeStamp;
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\InMemoryTransport;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;

/**
* @author Gary PEGEOT <garypegeot@gmail.com>
Expand All @@ -26,9 +28,21 @@ class InMemoryTransportTest extends TestCase
*/
private $transport;

/**
* @var InMemoryTransport
*/
private $serializeTransport;

/**
* @var SerializerInterface
*/
private $serializer;

protected function setUp(): void
{
$this->serializer = $this->createMock(SerializerInterface::class);
$this->transport = new InMemoryTransport();
$this->serializeTransport = new InMemoryTransport($this->serializer);
}

public function testSend()
Expand All @@ -38,6 +52,24 @@ public function testSend()
$this->assertSame([$envelope], $this->transport->getSent());
}

public function testSendWithSerialization()
{
$envelope = new Envelope(new \stdClass());
$envelopeDecoded = Envelope::wrap(new DummyMessage('Hello.'));
$this->serializer
->method('encode')
->with($this->equalTo($envelope))
->willReturn(['foo' => 'ba'])
;
$this->serializer
->method('decode')
->with(['foo' => 'ba'])
->willReturn($envelopeDecoded)
;
$this->serializeTransport->send($envelope);
$this->assertSame([$envelopeDecoded], $this->serializeTransport->getSent());
}

public function testQueue()
{
$envelope1 = new Envelope(new \stdClass());
Expand All @@ -51,6 +83,24 @@ public function testQueue()
$this->assertSame([], $this->transport->get());
}

public function testQueueWithSerialization()
{
$envelope = new Envelope(new \stdClass());
$envelopeDecoded = Envelope::wrap(new DummyMessage('Hello.'));
$this->serializer
->method('encode')
->with($this->equalTo($envelope))
->willReturn(['foo' => 'ba'])
;
$this->serializer
->method('decode')
->with(['foo' => 'ba'])
->willReturn($envelopeDecoded)
;
$this->serializeTransport->send($envelope);
$this->assertSame([$envelopeDecoded], $this->serializeTransport->get());
}

public function testAcknowledgeSameMessageWithDifferentStamps()
{
$envelope1 = new Envelope(new \stdClass(), [new AnEnvelopeStamp()]);
Expand All @@ -71,13 +121,49 @@ public function testAck()
$this->assertSame([$envelope], $this->transport->getAcknowledged());
}

public function testAckWithSerialization()
{
$envelope = new Envelope(new \stdClass());
$envelopeDecoded = Envelope::wrap(new DummyMessage('Hello.'));
$this->serializer
->method('encode')
->with($this->equalTo($envelope))
->willReturn(['foo' => 'ba'])
;
$this->serializer
->method('decode')
->with(['foo' => 'ba'])
->willReturn($envelopeDecoded)
;
$this->serializeTransport->ack($envelope);
$this->assertSame([$envelopeDecoded], $this->serializeTransport->getAcknowledged());
}

public function testReject()
{
$envelope = new Envelope(new \stdClass());
$this->transport->reject($envelope);
$this->assertSame([$envelope], $this->transport->getRejected());
}

public function testRejectWithSerialization()
{
$envelope = new Envelope(new \stdClass());
$envelopeDecoded = Envelope::wrap(new DummyMessage('Hello.'));
$this->serializer
->method('encode')
->with($this->equalTo($envelope))
->willReturn(['foo' => 'ba'])
;
$this->serializer
->method('decode')
->with(['foo' => 'ba'])
->willReturn($envelopeDecoded)
;
$this->serializeTransport->reject($envelope);
$this->assertSame([$envelopeDecoded], $this->serializeTransport->getRejected());
}

public function testReset()
{
$envelope = new Envelope(new \stdClass());
Expand Down
57 changes: 49 additions & 8 deletions 57 src/Symfony/Component/Messenger/Transport/InMemoryTransport.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
namespace Symfony\Component\Messenger\Transport;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Contracts\Service\ResetInterface;

/**
Expand Down Expand Up @@ -41,20 +42,30 @@ class InMemoryTransport implements TransportInterface, ResetInterface
*/
private $queue = [];

/**
* @var SerializerInterface|null
*/
private $serializer;

public function __construct(SerializerInterface $serializer = null)
{
$this->serializer = $serializer;
}

/**
* {@inheritdoc}
*/
public function get(): iterable
{
return array_values($this->queue);
return array_values($this->decode($this->queue));
}

/**
* {@inheritdoc}
*/
public function ack(Envelope $envelope): void
{
$this->acknowledged[] = $envelope;
$this->acknowledged[] = $this->encode($envelope);
$id = spl_object_hash($envelope->getMessage());
unset($this->queue[$id]);
}
Expand All @@ -64,7 +75,7 @@ public function ack(Envelope $envelope): void
*/
public function reject(Envelope $envelope): void
{
$this->rejected[] = $envelope;
$this->rejected[] = $this->encode($envelope);
$id = spl_object_hash($envelope->getMessage());
unset($this->queue[$id]);
}
Expand All @@ -74,9 +85,10 @@ public function reject(Envelope $envelope): void
*/
public function send(Envelope $envelope): Envelope
{
$this->sent[] = $envelope;
$encodedEnvelope = $this->encode($envelope);
$this->sent[] = $encodedEnvelope;
$id = spl_object_hash($envelope->getMessage());
$this->queue[$id] = $envelope;
$this->queue[$id] = $encodedEnvelope;

return $envelope;
}
Expand All @@ -91,22 +103,51 @@ public function reset()
*/
public function getAcknowledged(): array
{
return $this->acknowledged;
return $this->decode($this->acknowledged);
}

/**
* @return Envelope[]
*/
public function getRejected(): array
{
return $this->rejected;
return $this->decode($this->rejected);
}

/**
* @return Envelope[]
*/
public function getSent(): array
{
return $this->sent;
return $this->decode($this->sent);
}

/**
* @return Envelope|array
*/
private function encode(Envelope $envelope)
{
if (null === $this->serializer) {
return $envelope;
}

return $this->serializer->encode($envelope);
}

/**
* @param array<mixed> $messagesEncoded
*
* @return Envelope[]
*/
private function decode(array $messagesEncoded): array
{
if (null === $this->serializer) {
return $messagesEncoded;
}

return array_map(
[$this->serializer, 'decode'],
$messagesEncoded
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ class InMemoryTransportFactory implements TransportFactoryInterface, ResetInterf

public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
{
return $this->createdTransports[] = new InMemoryTransport();
['serialize' => $serialize] = $this->parseDsn($dsn);

return $this->createdTransports[] = new InMemoryTransport($serialize ? $serializer : null);
}

public function supports(string $dsn, array $options): bool
Expand All @@ -40,4 +42,16 @@ public function reset()
$transport->reset();
}
}

private function parseDsn(string $dsn): array
{
$query = [];
if ($queryAsString = strstr($dsn, '?')) {
parse_str(ltrim($queryAsString, '?'), $query);
}

return [
'serialize' => filter_var($query['serialize'] ?? false, \FILTER_VALIDATE_BOOLEAN),
];
}
}
Morty Proxy This is a proxified and sanitized view of the page, visit original site.