Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

[Messenger] Could not acknowledge redis message with BatchHandlerInterface  #44400

Copy link
Copy link
Open
@zip-fa

Description

@zip-fa
Issue body actions

Symfony version(s) affected

5.4.0, 6.0.0

How to reproduce

I have MessengerHandler which makes http requests (with only one parallel consumer):

messenger.yaml:

transports:
  test:
    dsn: '%env(MESSENGER_TRANSPORT_DSN)%/test'
    options:
      delete_after_ack: true
      consumer: '%env(MESSENGER_CONSUMER_NAME)%'
    retry_strategy:
      max_retries: 0

routing:
     'TestMessage': test

TestMessageHandler.php:

<?php
class TestMessageHandler implements MessageHandlerInterface, BatchHandlerInterface
{
    use BatchHandlerTrait;

    public function __construct(
        private HttpClientInterface $client
    ) { }

    public function __invoke(TestMessage $message, Acknowledger &$ack = null)
    {        
        return $this->handle($message, $ack);
    }
    
    private function shouldFlush(): bool
    {
        return 5 <= \count($this->jobs);
    }

    private function process(array $jobs): void
    {
        $responses = [];
        
        foreach ($jobs as [$job, $ack]) {          
            try {
                [$headers, $content] = prepareRequest();
                
                $responses[] = $this->client->request('POST', $job->getEndpoint(), [
                    'headers' => $headers,
                    'body' => $content
                ]);

                $ack->ack($job);
            } catch (\Exception $e) {
                $ack->nack($e);
            }
        }
        
        if(0 === count($responses)) {
          return;
        }

        foreach ($this->client->stream($responses) as $response => $chunk) {
            if ($chunk->isFirst()) {
                var_dump($response->getStatusCode());
            } else if ($chunk->isLast()) {
            }
        }
    }
}

cli:

MESSENGER_CONSUMER_NAME=test php bin/console messenger:consume test

Code works okay, but when i hit ctrl+c before script finished and start it again, it throws me an error:

In Connection.php line 441:

  Could not acknowledge redis message "1638374074049-0".

I noticed that after consumer crashes or i hit ctrl+c, same message comes to __invoke a few times. When removing BatchHandlerInterface it starts to work as excepected without any issue.

Possible Solution 1 (a bad one)

  1. Add uuid to every Message
  2. Destroy Acknowledger and create replacement for it (HandleMessageMiddleware.php:88)

Test.php:

class Test
{
  public function isAcknowledged()
  {
    return false;
  }
}

TestMessageHandler.php:

public function __invoke(SendPushMessage $message, Acknowledger &$ack = null)
{
    $uid = $message->getUuid();
    
    if(isset($this->jobs[$uid])) {
      $this->flush(true);
      
      try {
        $ack = null;
        unset($ack);
      } catch(\Exception) { }
      
      $ack = new Test();

      return 0;
    }
    
    return $this->handle($message, $ack);
}

private function handle(object $message, ?Acknowledger $ack)
{
    $uid = $message->getUuid();

    if (null === $ack) {
        $ack = new Acknowledger(get_debug_type($this));
        $this->jobs[$uid] = [$message, $ack];
        $this->flush(true);

        return $ack->getResult();
    }

    $this->jobs[$uid] = [$message, $ack];
    if (!$this->shouldFlush()) {
        return \count($this->jobs);
    }

    $this->flush(true);

    return 0;
}

Possible Solution 2

Comment __destruct in Symfony\Component\Messenger\Handler\Acknowledger.php

Handler.php:

public function __invoke(SendPushMessage $message, Acknowledger $ack = null)
{
    $uid = $message->getUuid();
    
    if(isset($this->jobs[$uid])) {
      $this->flush(true);
    
      return 0;
    }
    
    return $this->handle($message, $ack);
}

private function handle(object $message, ?Acknowledger $ack)
{
    $uid = $message->getUuid();

    if (null === $ack) {
        $ack = new Acknowledger(get_debug_type($this));
        $this->jobs[$uid] = [$message, $ack];
        $this->flush(true);

        return $ack->getResult();
    }

    $this->jobs[$uid] = [$message, $ack];
    if (!$this->shouldFlush()) {
        return \count($this->jobs);
    }

    $this->flush(true);

    return 0;
}

Additional Context

No response

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions

      Morty Proxy This is a proxified and sanitized view of the page, visit original site.