Open
Description
Symfony version(s) affected
5.4.0, 6.0.0
How to reproduce
I have MessengerHandler which makes http requests (with only one parallel consumer):
messenger.yaml:
transports:
test:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%/test'
options:
delete_after_ack: true
consumer: '%env(MESSENGER_CONSUMER_NAME)%'
retry_strategy:
max_retries: 0
routing:
'TestMessage': test
TestMessageHandler.php:
<?php
class TestMessageHandler implements MessageHandlerInterface, BatchHandlerInterface
{
use BatchHandlerTrait;
public function __construct(
private HttpClientInterface $client
) { }
public function __invoke(TestMessage $message, Acknowledger &$ack = null)
{
return $this->handle($message, $ack);
}
private function shouldFlush(): bool
{
return 5 <= \count($this->jobs);
}
private function process(array $jobs): void
{
$responses = [];
foreach ($jobs as [$job, $ack]) {
try {
[$headers, $content] = prepareRequest();
$responses[] = $this->client->request('POST', $job->getEndpoint(), [
'headers' => $headers,
'body' => $content
]);
$ack->ack($job);
} catch (\Exception $e) {
$ack->nack($e);
}
}
if(0 === count($responses)) {
return;
}
foreach ($this->client->stream($responses) as $response => $chunk) {
if ($chunk->isFirst()) {
var_dump($response->getStatusCode());
} else if ($chunk->isLast()) {
}
}
}
}
cli:
MESSENGER_CONSUMER_NAME=test php bin/console messenger:consume test
Code works okay, but when i hit ctrl+c before script finished and start it again, it throws me an error:
In Connection.php line 441:
Could not acknowledge redis message "1638374074049-0".
I noticed that after consumer crashes or i hit ctrl+c, same message comes to __invoke a few times. When removing BatchHandlerInterface it starts to work as excepected without any issue.
Possible Solution 1 (a bad one)
- Add uuid to every Message
- Destroy Acknowledger and create replacement for it (HandleMessageMiddleware.php:88)
Test.php:
class Test
{
public function isAcknowledged()
{
return false;
}
}
TestMessageHandler.php:
public function __invoke(SendPushMessage $message, Acknowledger &$ack = null)
{
$uid = $message->getUuid();
if(isset($this->jobs[$uid])) {
$this->flush(true);
try {
$ack = null;
unset($ack);
} catch(\Exception) { }
$ack = new Test();
return 0;
}
return $this->handle($message, $ack);
}
private function handle(object $message, ?Acknowledger $ack)
{
$uid = $message->getUuid();
if (null === $ack) {
$ack = new Acknowledger(get_debug_type($this));
$this->jobs[$uid] = [$message, $ack];
$this->flush(true);
return $ack->getResult();
}
$this->jobs[$uid] = [$message, $ack];
if (!$this->shouldFlush()) {
return \count($this->jobs);
}
$this->flush(true);
return 0;
}
Possible Solution 2
Comment __destruct in Symfony\Component\Messenger\Handler\Acknowledger.php
Handler.php:
public function __invoke(SendPushMessage $message, Acknowledger $ack = null)
{
$uid = $message->getUuid();
if(isset($this->jobs[$uid])) {
$this->flush(true);
return 0;
}
return $this->handle($message, $ack);
}
private function handle(object $message, ?Acknowledger $ack)
{
$uid = $message->getUuid();
if (null === $ack) {
$ack = new Acknowledger(get_debug_type($this));
$this->jobs[$uid] = [$message, $ack];
$this->flush(true);
return $ack->getResult();
}
$this->jobs[$uid] = [$message, $ack];
if (!$this->shouldFlush()) {
return \count($this->jobs);
}
$this->flush(true);
return 0;
}
Additional Context
No response