@@ -229,6 +229,8 @@ def test_send(cli, conn):
229
229
def test_poll (mocker ):
230
230
metadata = mocker .patch .object (KafkaClient , '_maybe_refresh_metadata' )
231
231
_poll = mocker .patch .object (KafkaClient , '_poll' )
232
+ ifrs = mocker .patch .object (KafkaClient , 'in_flight_request_count' )
233
+ ifrs .return_value = 1
232
234
cli = KafkaClient (api_version = (0 , 9 ))
233
235
234
236
# metadata timeout wins
@@ -245,6 +247,11 @@ def test_poll(mocker):
245
247
cli .poll ()
246
248
_poll .assert_called_with (cli .config ['request_timeout_ms' ] / 1000.0 )
247
249
250
+ # If no in-flight-requests, drop timeout to retry_backoff_ms
251
+ ifrs .return_value = 0
252
+ cli .poll ()
253
+ _poll .assert_called_with (cli .config ['retry_backoff_ms' ] / 1000.0 )
254
+
248
255
249
256
def test__poll ():
250
257
pass
@@ -300,12 +307,14 @@ def client(mocker):
300
307
301
308
def test_maybe_refresh_metadata_ttl (mocker , client ):
302
309
client .cluster .ttl .return_value = 1234
310
+ mocker .patch .object (KafkaClient , 'in_flight_request_count' , return_value = 1 )
303
311
304
312
client .poll (timeout_ms = 12345678 )
305
313
client ._poll .assert_called_with (1.234 )
306
314
307
315
308
316
def test_maybe_refresh_metadata_backoff (mocker , client ):
317
+ mocker .patch .object (KafkaClient , 'in_flight_request_count' , return_value = 1 )
309
318
now = time .time ()
310
319
t = mocker .patch ('time.time' )
311
320
t .return_value = now
@@ -316,6 +325,7 @@ def test_maybe_refresh_metadata_backoff(mocker, client):
316
325
317
326
def test_maybe_refresh_metadata_in_progress (mocker , client ):
318
327
client ._metadata_refresh_in_progress = True
328
+ mocker .patch .object (KafkaClient , 'in_flight_request_count' , return_value = 1 )
319
329
320
330
client .poll (timeout_ms = 12345678 )
321
331
client ._poll .assert_called_with (9999.999 ) # request_timeout_ms
@@ -324,6 +334,7 @@ def test_maybe_refresh_metadata_in_progress(mocker, client):
324
334
def test_maybe_refresh_metadata_update (mocker , client ):
325
335
mocker .patch .object (client , 'least_loaded_node' , return_value = 'foobar' )
326
336
mocker .patch .object (client , '_can_send_request' , return_value = True )
337
+ mocker .patch .object (KafkaClient , 'in_flight_request_count' , return_value = 1 )
327
338
send = mocker .patch .object (client , 'send' )
328
339
329
340
client .poll (timeout_ms = 12345678 )
@@ -338,6 +349,7 @@ def test_maybe_refresh_metadata_cant_send(mocker, client):
338
349
mocker .patch .object (client , '_can_connect' , return_value = True )
339
350
mocker .patch .object (client , '_maybe_connect' , return_value = True )
340
351
mocker .patch .object (client , 'maybe_connect' , return_value = True )
352
+ mocker .patch .object (KafkaClient , 'in_flight_request_count' , return_value = 1 )
341
353
342
354
now = time .time ()
343
355
t = mocker .patch ('time.time' )
0 commit comments