Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

[Messenger] multiple failure transports support #34979

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Prev Previous commit
wip
  • Loading branch information
monteiro committed Oct 3, 2020
commit 353fc62807b1c3ab3c0e7423a3708a88f252873e
Original file line number Diff line number Diff line change
Expand Up @@ -1793,7 +1793,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
->replaceArgument(2, $config['serializer']['symfony_serializer']['context']);
$container->setAlias('messenger.default_serializer', $config['serializer']['default_serializer']);
}

$senderAliases = [];
$transportRetryReferences = [];
foreach ($config['transports'] as $name => $transport) {
Expand All @@ -1802,7 +1802,11 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
$transportDefinition = (new Definition(TransportInterface::class))
->setFactory([new Reference('messenger.transport_factory'), 'createTransport'])
->setArguments([$transport['dsn'], $transport['options'] + ['transport_name' => $name], new Reference($serializerId)])
->addTag('messenger.receiver', ['alias' => $name])
->addTag('messenger.receiver', [
'alias' => $name,
'failure_transport' => $transport['failure_transport'] ?? null
]
)
;
$container->setDefinition($transportId = 'messenger.transport.'.$name, $transportDefinition);
$senderAliases[$name] = $transportId;
Expand Down Expand Up @@ -1873,8 +1877,9 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
if (!isset($senderReferences[$config['failure_transport']])) {
throw new LogicException(sprintf('Invalid Messenger configuration: the failure transport "%s" is not a valid transport or service id.', $config['failure_transport']));
}

$failureTransports[$config['failure_transport']] = $senderReferences[$config['failure_transport']];
$container->setAlias('messenger.failure_transports.default_transport', $config['failure_transport']);
}

foreach ($config['transports'] as $name => $transport) {
Expand Down Expand Up @@ -1909,8 +1914,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
$container->getDefinition($failureTransportsByTransportNameServiceLocatorId)
->replaceArgument(0, $failureTransportsByTransportName);
$container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')
->replaceArgument(0, $senderReferences[$config['failure_transport']] ?? null)
->replaceArgument(2, $failureTransportsByTransportNameServiceLocator);
->replaceArgument(0, $failureTransportsByTransportNameServiceLocator);
} else {
$container->removeDefinition('console.command.messenger_failed_messages_retry');
$container->removeDefinition('console.command.messenger_failed_messages_show');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ private function registerHandlers(ContainerBuilder $container, array $busIds)
if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) {
$messageLocation = isset($tag['handles']) ? 'declared in your tag attribute "handles"' : ($r->implementsInterface(MessageSubscriberInterface::class) ? sprintf('returned by method "%s::getHandledMessages()"', $r->getName()) : sprintf('used as argument type in method "%s::%s()"', $r->getName(), $method));

throw new RuntimeException(sprintf('Invalid handler service "%s": class or interface "%s" "%s" not found.', $serviceId, $message, $messageLocation));
throw new RuntimeException(sprintf('Invalid handler service "%s": class or interface "%s" %s not found.', $serviceId, $message, $messageLocation));
}

if (!$r->hasMethod($method)) {
Expand Down Expand Up @@ -298,6 +298,18 @@ private function registerReceivers(ContainerBuilder $container, array $busIds)
}

$container->getDefinition('messenger.receiver_locator')->replaceArgument(0, $receiverMapping);

$failedCommandIds = [
'console.command.messenger_failed_messages_retry',
'console.command.messenger_failed_messages_show',
'console.command.messenger_failed_messages_remove',
];
foreach ($failedCommandIds as $failedCommandId) {
if ($container->hasDefinition($failedCommandId)) {
$definition = $container->getDefinition($failedCommandId);
$definition->replaceArgument(1, $receiverMapping[$definition->getArgument(0)]);
}
}
}

private function registerBusToCollector(ContainerBuilder $container, string $busId)
Expand Down
Morty Proxy This is a proxified and sanitized view of the page, visit original site.