Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 9f0b518

Browse filesBrowse files
dpkpjeffwidman
authored andcommitted
Reduce client poll timeout when no ifrs
1 parent 5bb1abd commit 9f0b518
Copy full SHA for 9f0b518

File tree

2 files changed

+15
-0
lines changed
Filter options

2 files changed

+15
-0
lines changed

‎kafka/client_async.py

Copy file name to clipboardExpand all lines: kafka/client_async.py
+3Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -588,6 +588,9 @@ def poll(self, timeout_ms=None, future=None):
588588
metadata_timeout_ms,
589589
idle_connection_timeout_ms,
590590
self.config['request_timeout_ms'])
591+
# if there are no requests in flight, do not block longer than the retry backoff
592+
if self.in_flight_request_count() == 0:
593+
timeout = min(timeout, self.config['retry_backoff_ms'])
591594
timeout = max(0, timeout / 1000) # avoid negative timeouts
592595

593596
self._poll(timeout)

‎test/test_client_async.py

Copy file name to clipboardExpand all lines: test/test_client_async.py
+12Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,8 @@ def test_send(cli, conn):
229229
def test_poll(mocker):
230230
metadata = mocker.patch.object(KafkaClient, '_maybe_refresh_metadata')
231231
_poll = mocker.patch.object(KafkaClient, '_poll')
232+
ifrs = mocker.patch.object(KafkaClient, 'in_flight_request_count')
233+
ifrs.return_value = 1
232234
cli = KafkaClient(api_version=(0, 9))
233235

234236
# metadata timeout wins
@@ -245,6 +247,11 @@ def test_poll(mocker):
245247
cli.poll()
246248
_poll.assert_called_with(cli.config['request_timeout_ms'] / 1000.0)
247249

250+
# If no in-flight-requests, drop timeout to retry_backoff_ms
251+
ifrs.return_value = 0
252+
cli.poll()
253+
_poll.assert_called_with(cli.config['retry_backoff_ms'] / 1000.0)
254+
248255

249256
def test__poll():
250257
pass
@@ -300,12 +307,14 @@ def client(mocker):
300307

301308
def test_maybe_refresh_metadata_ttl(mocker, client):
302309
client.cluster.ttl.return_value = 1234
310+
mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1)
303311

304312
client.poll(timeout_ms=12345678)
305313
client._poll.assert_called_with(1.234)
306314

307315

308316
def test_maybe_refresh_metadata_backoff(mocker, client):
317+
mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1)
309318
now = time.time()
310319
t = mocker.patch('time.time')
311320
t.return_value = now
@@ -316,6 +325,7 @@ def test_maybe_refresh_metadata_backoff(mocker, client):
316325

317326
def test_maybe_refresh_metadata_in_progress(mocker, client):
318327
client._metadata_refresh_in_progress = True
328+
mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1)
319329

320330
client.poll(timeout_ms=12345678)
321331
client._poll.assert_called_with(9999.999) # request_timeout_ms
@@ -324,6 +334,7 @@ def test_maybe_refresh_metadata_in_progress(mocker, client):
324334
def test_maybe_refresh_metadata_update(mocker, client):
325335
mocker.patch.object(client, 'least_loaded_node', return_value='foobar')
326336
mocker.patch.object(client, '_can_send_request', return_value=True)
337+
mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1)
327338
send = mocker.patch.object(client, 'send')
328339

329340
client.poll(timeout_ms=12345678)
@@ -338,6 +349,7 @@ def test_maybe_refresh_metadata_cant_send(mocker, client):
338349
mocker.patch.object(client, '_can_connect', return_value=True)
339350
mocker.patch.object(client, '_maybe_connect', return_value=True)
340351
mocker.patch.object(client, 'maybe_connect', return_value=True)
352+
mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1)
341353

342354
now = time.time()
343355
t = mocker.patch('time.time')

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.