Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

[Messenger] Messages stuck in unack state in the queue when 2 batch handlers are involved #59958

Copy link
Copy link
Open
@pmishev

Description

@pmishev
Issue body actions

Symfony version(s) affected

6.4.18

Description

When dispatching messages for more than 1 batch handler, messages get stuck in the queue, not being acknowledged.

Possibly related to #58433

How to reproduce

framework:
    messenger:
        routing:
            'App\Message\Command\FirstCommand': main_normal_priority
            'App\Message\Command\SecondCommand': main_normal_priority

First handler:

#[AsMessageHandler(bus: 'command.bus')]
class FirstCommandHandler implements BatchHandlerInterface
{
    use BatchHandlerTrait;
    private LoggerInterface $logger;

    public function __construct(
        ?LoggerInterface $logger = null,
    ) {
        $this->logger = $logger ?: new NullLogger();
    }

    public function __invoke(FirstCommand $message, ?Acknowledger $ack = null)
    {
        return $this->handle($message, $ack);
    }

    protected function process(array $jobs): void
    {
        foreach ($jobs as [$message, $ack]) {
            try {
                $this->logger->info('First handler');
                $ack->ack($message);
            } catch (\Throwable $e) {
                $ack->nack($e);
            }
        }
    }
}

Second handler:

#[AsMessageHandler(bus: 'command.bus')]
class SecondCommandHandler implements BatchHandlerInterface
{
    use BatchHandlerTrait;
    private LoggerInterface $logger;

    public function __construct(
        ?LoggerInterface $logger = null,
    ) {
        $this->logger = $logger ?: new NullLogger();
    }

    public function __invoke(SecondCommand $message, ?Acknowledger $ack = null)
    {
        return $this->handle($message, $ack);
    }

    protected function process(array $jobs): void
    {
        foreach ($jobs as [$message, $ack]) {
            try {
                $this->logger->info('Second handler');

                $ack->ack($message);
            } catch (\Throwable $e) {
                $ack->nack($e);
            }
        }
    }
}

Dispatch a message for each handler:

        $this->commandBus->dispatch(new FirstCommand());
        $this->commandBus->dispatch(new SecondCommand());

Resulting output from consumer:

15:01:30 INFO      [messenger] Received message App\Message\Command\FirstCommand ["class" => "App\Message\Command\FirstCommand"]
15:01:30 INFO      [messenger] Message App\Message\Command\FirstCommand handled by App\MessageHandler\Command\FirstCommandHandler::__invoke ["class" => "App\Message\Command\FirstCommand","handler" => "App\MessageHandler\Command\FirstCommandHandler::__invoke"]
15:01:30 INFO      [messenger] Received message App\Message\Command\SecondCommand ["class" => "App\Message\Command\SecondCommand"]
15:01:30 INFO      [messenger] Message App\Message\Command\SecondCommand handled by App\MessageHandler\Command\SecondCommandHandler::__invoke ["class" => "App\Message\Command\SecondCommand","handler" => "App\MessageHandler\Command\SecondCommandHandler::__invoke"]
15:01:30 INFO      [messenger] Received message App\Message\Command\FirstCommand ["class" => "App\Message\Command\FirstCommand"]
15:01:30 INFO      [app] First handler
15:01:30 INFO      [messenger] App\Message\Command\FirstCommand was handled successfully (acknowledging to transport). ["class" => "App\Message\Command\FirstCommand"]

Message for the Second handler is now stuck in the queue in an Unacked state.

Restarting the consumer now would consume it:

15:15:52 WARNING   [messenger] Error thrown while handling message App\Message\Command\SecondCommand. Sending for retry #1 using 1000 ms delay. Error: "Redelivered message from AMQP detected that will be rejected and trigger the retry logic." ["class" => "App\Message\Command\SecondCommand","retryCount" => 1,"delay" => 1000,"error" => "Redelivered message from AMQP detected that will be rejected and trigger the retry logic.","exception" => Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException^ { …}]
15:15:53 INFO      [messenger] Received message App\Message\Command\SecondCommand ["class" => "App\Message\Command\SecondCommand"]
15:15:53 INFO      [messenger] Message App\Message\Command\SecondCommand handled by App\MessageHandler\Command\SecondCommandHandler::__invoke ["class" => "App\Message\Command\SecondCommand","handler" => "App\MessageHandler\Command\SecondCommandHandler::__invoke"]
15:15:53 INFO      [messenger] Received message App\Message\Command\SecondCommand ["class" => "App\Message\Command\SecondCommand"]
15:15:53 INFO      [app] Second handler
15:15:53 INFO      [messenger] App\Message\Command\SecondCommand was handled successfully (acknowledging to transport). ["class" => "App\Message\Command\SecondCommand"]

Possible Solution

No response

Additional Context

No response

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions

      Morty Proxy This is a proxified and sanitized view of the page, visit original site.