Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit d642f76

Browse filesBrowse files
committed
feature #42163 [Messenger] [Redis] Prepare turning delete_after_ack to true in 6.0 (chalasr)
This PR was merged into the 5.4 branch. Discussion ---------- [Messenger] [Redis] Prepare turning `delete_after_ack` to `true` in 6.0 | Q | A | ------------- | --- | Branch? | 5.4 | Bug fix? | no | New feature? | no | Deprecations? | yes | Tickets | Fix #42122 | License | MIT | Docs PR | todo Having this option turned on makes redis-messenger much more consistent with other transports, more adapted to the 80% use case and more importantly, prevents having to deal with OOM issues. Commits ------- b439a11 [Messenger][Redis] Prepare turning `delete_after_ack` to `true` in 6.0
2 parents 3955d39 + b439a11 commit d642f76
Copy full SHA for d642f76

File tree

7 files changed

+70
-44
lines changed
Filter options

7 files changed

+70
-44
lines changed

‎UPGRADE-5.4.md

Copy file name to clipboardExpand all lines: UPGRADE-5.4.md
+6Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@ HttpKernel
2323

2424
* Deprecate `AbstractTestSessionListener::getSession` inject a session in the request instead
2525

26+
Messenger
27+
---------
28+
29+
* Deprecate not setting the `delete_after_ack` config option (or DSN parameter) using the Redis transport,
30+
its default value will change to `true` in 6.0
31+
2632
SecurityBundle
2733
--------------
2834

‎UPGRADE-6.0.md

Copy file name to clipboardExpand all lines: UPGRADE-6.0.md
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ Messenger
152152
* The signature of method `RetryStrategyInterface::getWaitingTime()` has been updated to `RetryStrategyInterface::getWaitingTime(Envelope $message, \Throwable $throwable = null)`.
153153
* Removed the `prefetch_count` parameter in the AMQP bridge.
154154
* Removed the use of TLS option for Redis Bridge, use `rediss://127.0.0.1` instead of `redis://127.0.0.1?tls=1`
155+
* The `delete_after_ack` config option of the Redis transport now defaults to `true`
155156

156157
Mime
157158
----

‎src/Symfony/Component/Messenger/Bridge/Redis/CHANGELOG.md

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Redis/CHANGELOG.md
+6Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
CHANGELOG
22
=========
33

4+
5.4
5+
---
6+
7+
* Deprecate not setting the `delete_after_ack` config option (or DSN parameter),
8+
its default value will change to `true` in 6.0
9+
410
5.3
511
---
612

‎src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/ConnectionTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/ConnectionTest.php
+50-39Lines changed: 50 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class ConnectionTest extends TestCase
2828
public static function setUpBeforeClass(): void
2929
{
3030
try {
31-
$redis = Connection::fromDsn('redis://localhost/queue');
31+
$redis = Connection::fromDsn('redis://localhost/queue?delete_after_ack=true');
3232
$redis->get();
3333
} catch (TransportException $e) {
3434
if (0 === strpos($e->getMessage(), 'ERR unknown command \'X')) {
@@ -61,11 +61,11 @@ public function testFromInvalidDsn()
6161
public function testFromDsn()
6262
{
6363
$this->assertEquals(
64-
new Connection(['stream' => 'queue'], [
64+
new Connection(['stream' => 'queue', 'delete_after_ack' => true], [
6565
'host' => 'localhost',
6666
'port' => 6379,
6767
]),
68-
Connection::fromDsn('redis://localhost/queue')
68+
Connection::fromDsn('redis://localhost/queue?delete_after_ack=1')
6969
);
7070
}
7171

@@ -80,33 +80,33 @@ public function testFromDsnWithMultipleHosts()
8080
}, $hosts);
8181
$dsn = implode(',', $dsn);
8282

83-
$this->assertInstanceOf(Connection::class, Connection::fromDsn($dsn));
83+
$this->assertInstanceOf(Connection::class, Connection::fromDsn($dsn, ['delete_after_ack' => true]));
8484
}
8585

8686
public function testFromDsnOnUnixSocket()
8787
{
8888
$this->assertEquals(
89-
new Connection(['stream' => 'queue'], [
89+
new Connection(['stream' => 'queue', 'delete_after_ack' => true], [
9090
'host' => '/var/run/redis/redis.sock',
9191
'port' => 0,
9292
], [], $redis = $this->createMock(\Redis::class)),
93-
Connection::fromDsn('redis:///var/run/redis/redis.sock', ['stream' => 'queue'], $redis)
93+
Connection::fromDsn('redis:///var/run/redis/redis.sock', ['stream' => 'queue', 'delete_after_ack' => true], $redis)
9494
);
9595
}
9696

9797
public function testFromDsnWithOptions()
9898
{
9999
$this->assertEquals(
100-
Connection::fromDsn('redis://localhost', ['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1', 'auto_setup' => false, 'serializer' => 2]),
101-
Connection::fromDsn('redis://localhost/queue/group1/consumer1?serializer=2&auto_setup=0')
100+
Connection::fromDsn('redis://localhost', ['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1', 'auto_setup' => false, 'serializer' => 2, 'delete_after_ack' => true]),
101+
Connection::fromDsn('redis://localhost/queue/group1/consumer1?serializer=2&auto_setup=0&delete_after_ack=1')
102102
);
103103
}
104104

105105
public function testFromDsnWithOptionsAndTrailingSlash()
106106
{
107107
$this->assertEquals(
108-
Connection::fromDsn('redis://localhost/', ['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1', 'auto_setup' => false, 'serializer' => 2]),
109-
Connection::fromDsn('redis://localhost/queue/group1/consumer1?serializer=2&auto_setup=0')
108+
Connection::fromDsn('redis://localhost/', ['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1', 'auto_setup' => false, 'serializer' => 2, 'delete_after_ack' => true]),
109+
Connection::fromDsn('redis://localhost/queue/group1/consumer1?serializer=2&auto_setup=0&delete_after_ack=1')
110110
);
111111
}
112112

@@ -146,32 +146,32 @@ public function testFromDsnWithRedissScheme()
146146
->with('tls://127.0.0.1', 6379)
147147
->willReturn(null);
148148

149-
Connection::fromDsn('rediss://127.0.0.1', [], $redis);
149+
Connection::fromDsn('rediss://127.0.0.1?delete_after_ack=true', [], $redis);
150150
}
151151

152152
public function testFromDsnWithQueryOptions()
153153
{
154154
$this->assertEquals(
155-
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1'], [
155+
new Connection(['stream' => 'queue', 'group' => 'group1', 'consumer' => 'consumer1', 'delete_after_ack' => true], [
156156
'host' => 'localhost',
157157
'port' => 6379,
158158
], [
159159
'serializer' => 2,
160160
]),
161-
Connection::fromDsn('redis://localhost/queue/group1/consumer1?serializer=2')
161+
Connection::fromDsn('redis://localhost/queue/group1/consumer1?serializer=2&delete_after_ack=1')
162162
);
163163
}
164164

165165
public function testFromDsnWithMixDsnQueryOptions()
166166
{
167167
$this->assertEquals(
168-
Connection::fromDsn('redis://localhost/queue/group1?serializer=2', ['consumer' => 'specific-consumer']),
169-
Connection::fromDsn('redis://localhost/queue/group1/specific-consumer?serializer=2')
168+
Connection::fromDsn('redis://localhost/queue/group1?serializer=2', ['consumer' => 'specific-consumer', 'delete_after_ack' => true]),
169+
Connection::fromDsn('redis://localhost/queue/group1/specific-consumer?serializer=2&delete_after_ack=1')
170170
);
171171

172172
$this->assertEquals(
173-
Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['consumer' => 'specific-consumer']),
174-
Connection::fromDsn('redis://localhost/queue/group1/consumer1')
173+
Connection::fromDsn('redis://localhost/queue/group1/consumer1', ['consumer' => 'specific-consumer', 'delete_after_ack' => true]),
174+
Connection::fromDsn('redis://localhost/queue/group1/consumer1?delete_after_ack=1')
175175
);
176176
}
177177

@@ -200,7 +200,7 @@ public function testKeepGettingPendingMessages()
200200
->with('symfony', 'consumer', ['queue' => 0], 1, null)
201201
->willReturn(['queue' => [['message' => '{"body":"Test","headers":[]}']]]);
202202

203-
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
203+
$connection = Connection::fromDsn('redis://localhost/queue', ['delete_after_ack' => true], $redis);
204204
$this->assertNotNull($connection->get());
205205
$this->assertNotNull($connection->get());
206206
$this->assertNotNull($connection->get());
@@ -214,7 +214,7 @@ public function testAuth()
214214
->with('password')
215215
->willReturn(true);
216216

217-
Connection::fromDsn('redis://password@localhost/queue', [], $redis);
217+
Connection::fromDsn('redis://password@localhost/queue', ['delete_after_ack' => true], $redis);
218218
}
219219

220220
public function testAuthFromOptions()
@@ -225,7 +225,7 @@ public function testAuthFromOptions()
225225
->with('password')
226226
->willReturn(true);
227227

228-
Connection::fromDsn('redis://localhost/queue', ['auth' => 'password'], $redis);
228+
Connection::fromDsn('redis://localhost/queue', ['auth' => 'password', 'delete_after_ack' => true], $redis);
229229
}
230230

231231
public function testAuthFromOptionsAndDsn()
@@ -236,7 +236,7 @@ public function testAuthFromOptionsAndDsn()
236236
->with('password2')
237237
->willReturn(true);
238238

239-
Connection::fromDsn('redis://password1@localhost/queue', ['auth' => 'password2'], $redis);
239+
Connection::fromDsn('redis://password1@localhost/queue', ['auth' => 'password2', 'delete_after_ack' => true], $redis);
240240
}
241241

242242
public function testNoAuthWithEmptyPassword()
@@ -247,7 +247,7 @@ public function testNoAuthWithEmptyPassword()
247247
->with('')
248248
->willThrowException(new \RuntimeException());
249249

250-
Connection::fromDsn('redis://@localhost/queue', [], $redis);
250+
Connection::fromDsn('redis://@localhost/queue', ['delete_after_ack' => true], $redis);
251251
}
252252

253253
public function testAuthZeroPassword()
@@ -258,7 +258,7 @@ public function testAuthZeroPassword()
258258
->with('0')
259259
->willReturn(true);
260260

261-
Connection::fromDsn('redis://0@localhost/queue', [], $redis);
261+
Connection::fromDsn('redis://0@localhost/queue', ['delete_after_ack' => true], $redis);
262262
}
263263

264264
public function testFailedAuth()
@@ -271,14 +271,14 @@ public function testFailedAuth()
271271
->with('password')
272272
->willReturn(false);
273273

274-
Connection::fromDsn('redis://password@localhost/queue', [], $redis);
274+
Connection::fromDsn('redis://password@localhost/queue', ['delete_after_ack' => true], $redis);
275275
}
276276

277277
public function testDbIndex()
278278
{
279279
$redis = new \Redis();
280280

281-
Connection::fromDsn('redis://localhost/queue?dbindex=2', [], $redis);
281+
Connection::fromDsn('redis://localhost/queue?dbindex=2', ['delete_after_ack' => true], $redis);
282282

283283
$this->assertSame(2, $redis->getDbNum());
284284
}
@@ -291,7 +291,7 @@ public function testGetPendingMessageFirst()
291291
->with('symfony', 'consumer', ['queue' => '0'], 1, null)
292292
->willReturn(['queue' => [['message' => '{"body":"1","headers":[]}']]]);
293293

294-
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
294+
$connection = Connection::fromDsn('redis://localhost/queue', ['delete_after_ack' => true], $redis);
295295
$connection->get();
296296
}
297297

@@ -317,7 +317,7 @@ public function testClaimAbandonedMessageWithRaceCondition()
317317
->with('queue', 'symfony', 'consumer', 3600000, ['redisid-123'], ['JUSTID'])
318318
->willReturn([]);
319319

320-
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
320+
$connection = Connection::fromDsn('redis://localhost/queue', ['delete_after_ack' => true], $redis);
321321
$connection->get();
322322
}
323323

@@ -345,7 +345,7 @@ public function testClaimAbandonedMessage()
345345
->with('queue', 'symfony', 'consumer', 3600000, ['redisid-123'], ['JUSTID'])
346346
->willReturn([]);
347347

348-
$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
348+
$connection = Connection::fromDsn('redis://localhost/queue', ['delete_after_ack' => true], $redis);
349349
$connection->get();
350350
}
351351

@@ -357,22 +357,22 @@ public function testUnexpectedRedisError()
357357
$redis->expects($this->once())->method('xreadgroup')->willReturn(false);
358358
$redis->expects($this->once())->method('getLastError')->willReturn('Redis error happens');
359359

360-
$connection = Connection::fromDsn('redis://localhost/queue', ['auto_setup' => false], $redis);
360+
$connection = Connection::fromDsn('redis://localhost/queue', ['auto_setup' => false, 'delete_after_ack' => true], $redis);
361361
$connection->get();
362362
}
363363

364364
public function testGetAfterReject()
365365
{
366366
$redis = new \Redis();
367-
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget', [], $redis);
367+
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget', ['delete_after_ack' => true], $redis);
368368

369369
$connection->add('1', []);
370370
$connection->add('2', []);
371371

372372
$failing = $connection->get();
373373
$connection->reject($failing['id']);
374374

375-
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget');
375+
$connection = Connection::fromDsn('redis://localhost/messenger-rejectthenget', ['delete_after_ack' => true]);
376376
$this->assertNotNull($connection->get());
377377

378378
$redis->del('messenger-rejectthenget');
@@ -382,7 +382,7 @@ public function testGetNonBlocking()
382382
{
383383
$redis = new \Redis();
384384

385-
$connection = Connection::fromDsn('redis://localhost/messenger-getnonblocking', [], $redis);
385+
$connection = Connection::fromDsn('redis://localhost/messenger-getnonblocking', ['delete_after_ack' => true], $redis);
386386

387387
$this->assertNull($connection->get()); // no message, should return null immediately
388388
$connection->add('1', []);
@@ -394,7 +394,7 @@ public function testGetNonBlocking()
394394
public function testJsonError()
395395
{
396396
$redis = new \Redis();
397-
$connection = Connection::fromDsn('redis://localhost/json-error', [], $redis);
397+
$connection = Connection::fromDsn('redis://localhost/json-error', ['delete_after_ack' => true], $redis);
398398
try {
399399
$connection->add("\xB1\x31", []);
400400
} catch (TransportException $e) {
@@ -411,7 +411,7 @@ public function testMaxEntries()
411411
->with('queue', '*', ['message' => '{"body":"1","headers":[]}'], 20000, true)
412412
->willReturn(1);
413413

414-
$connection = Connection::fromDsn('redis://localhost/queue?stream_max_entries=20000', [], $redis); // 1 = always
414+
$connection = Connection::fromDsn('redis://localhost/queue?stream_max_entries=20000', ['delete_after_ack' => true], $redis);
415415
$connection->add('1', []);
416416
}
417417

@@ -426,10 +426,20 @@ public function testDeleteAfterAck()
426426
->with('queue', ['1'])
427427
->willReturn(1);
428428

429-
$connection = Connection::fromDsn('redis://localhost/queue?delete_after_ack=true', [], $redis); // 1 = always
429+
$connection = Connection::fromDsn('redis://localhost/queue?delete_after_ack=true', [], $redis);
430430
$connection->ack('1');
431431
}
432432

433+
/**
434+
* @group legacy
435+
*/
436+
public function testLegacyOmitDeleteAfterAck()
437+
{
438+
$this->expectDeprecation('Since symfony/redis-messenger 5.4: Not setting the "delete_after_ack" boolean option explicitly is deprecated, its default value will change to true in 6.0.');
439+
440+
Connection::fromDsn('redis://localhost/queue');
441+
}
442+
433443
public function testDeleteAfterReject()
434444
{
435445
$redis = $this->createMock(\Redis::class);
@@ -441,7 +451,7 @@ public function testDeleteAfterReject()
441451
->with('queue', ['1'])
442452
->willReturn(1);
443453

444-
$connection = Connection::fromDsn('redis://localhost/queue?delete_after_reject=true', [], $redis); // 1 = always
454+
$connection = Connection::fromDsn('redis://localhost/queue?delete_after_reject=true', ['delete_after_ack' => true], $redis);
445455
$connection->reject('1');
446456
}
447457

@@ -455,7 +465,7 @@ public function testLastErrorGetsCleared()
455465
$redis->method('getLastError')->willReturnOnConsecutiveCalls('xadd error', 'xack error');
456466
$redis->expects($this->exactly(2))->method('clearLastError');
457467

458-
$connection = Connection::fromDsn('redis://localhost/messenger-clearlasterror', ['auto_setup' => false], $redis);
468+
$connection = Connection::fromDsn('redis://localhost/messenger-clearlasterror', ['auto_setup' => false, 'delete_after_ack' => true], $redis);
459469

460470
try {
461471
$connection->add('message', []);
@@ -475,7 +485,7 @@ public function testLastErrorGetsCleared()
475485
public function testLazy()
476486
{
477487
$redis = new \Redis();
478-
$connection = Connection::fromDsn('redis://localhost/messenger-lazy?lazy=1', [], $redis);
488+
$connection = Connection::fromDsn('redis://localhost/messenger-lazy?lazy=1', ['delete_after_ack' => true], $redis);
479489

480490
$connection->add('1', []);
481491
$this->assertNotEmpty($message = $connection->get());
@@ -490,7 +500,8 @@ public function testLazyCluster()
490500

491501
$connection = new Connection(
492502
['lazy' => true],
493-
['host' => explode(' ', getenv('REDIS_CLUSTER_HOSTS'))]
503+
['host' => explode(' ', getenv('REDIS_CLUSTER_HOSTS'))],
504+
['delete_after_ack' => true]
494505
);
495506

496507
$connection->add('1', []);

‎src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisExtIntegrationTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisExtIntegrationTest.php
+3-3Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ protected function setUp(): void
3333

3434
try {
3535
$this->redis = new \Redis();
36-
$this->connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), [], $this->redis);
36+
$this->connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), ['delete_after_ack' => true], $this->redis);
3737
$this->connection->cleanup();
3838
$this->connection->setup();
3939
} catch (\Exception $e) {
@@ -109,7 +109,7 @@ public function testConnectionSendDelayedMessagesWithSameContent()
109109
public function testConnectionBelowRedeliverTimeout()
110110
{
111111
// lower redeliver timeout and claim interval
112-
$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), [], $this->redis);
112+
$connection = Connection::fromDsn(getenv('MESSENGER_REDIS_DSN'), ['delete_after_ack' => true], $this->redis);
113113

114114
$connection->cleanup();
115115
$connection->setup();
@@ -137,7 +137,7 @@ public function testConnectionClaimAndRedeliver()
137137
// lower redeliver timeout and claim interval
138138
$connection = Connection::fromDsn(
139139
getenv('MESSENGER_REDIS_DSN'),
140-
['redeliver_timeout' => 0, 'claim_interval' => 500],
140+
['redeliver_timeout' => 0, 'claim_interval' => 500, 'delete_after_ack' => true],
141141
$this->redis
142142
);
143143

‎src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisTransportFactoryTest.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Redis/Tests/Transport/RedisTransportFactoryTest.php
+2-2Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ public function testCreateTransport()
4040

4141
$factory = new RedisTransportFactory();
4242
$serializer = $this->createMock(SerializerInterface::class);
43-
$expectedTransport = new RedisTransport(Connection::fromDsn('redis://'.getenv('REDIS_HOST'), ['stream' => 'bar']), $serializer);
43+
$expectedTransport = new RedisTransport(Connection::fromDsn('redis://'.getenv('REDIS_HOST'), ['stream' => 'bar', 'delete_after_ack' => true]), $serializer);
4444

45-
$this->assertEquals($expectedTransport, $factory->createTransport('redis://'.getenv('REDIS_HOST'), ['stream' => 'bar'], $serializer));
45+
$this->assertEquals($expectedTransport, $factory->createTransport('redis://'.getenv('REDIS_HOST'), ['stream' => 'bar', 'delete_after_ack' => true], $serializer));
4646
}
4747

4848
private function skipIfRedisUnavailable()

‎src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php

Copy file name to clipboardExpand all lines: src/Symfony/Component/Messenger/Bridge/Redis/Transport/Connection.php
+2Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,8 @@ public static function fromDsn(string $dsn, array $redisOptions = [], $redis = n
182182
if (\array_key_exists('delete_after_ack', $redisOptions)) {
183183
$deleteAfterAck = filter_var($redisOptions['delete_after_ack'], \FILTER_VALIDATE_BOOLEAN);
184184
unset($redisOptions['delete_after_ack']);
185+
} else {
186+
trigger_deprecation('symfony/redis-messenger', '5.4', 'Not setting the "delete_after_ack" boolean option explicitly is deprecated, its default value will change to true in 6.0.');
185187
}
186188

187189
$deleteAfterReject = null;

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.