Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
This repository was archived by the owner on Oct 29, 2024. It is now read-only.

Commit c903d73

Browse filesBrowse files
authored
Fix chunked query to return chunk resultsets (#753)
When querying large data sets, it's vital to get a chunked responses to manage memory usage. Wrapping the query response in a generator and streaming the request provides the desired result. It also fixes `InfluxDBClient.query()` behavior for chunked queries that is currently not working according to [specs](https://github.com/influxdata/influxdb-python/blob/master/influxdb/client.py#L429) Closes #585. Closes #531. Closes #538.
1 parent d6192a7 commit c903d73
Copy full SHA for c903d73

File tree

2 files changed

+25
-29
lines changed
Filter options

2 files changed

+25
-29
lines changed

‎influxdb/client.py

Copy file name to clipboardExpand all lines: influxdb/client.py
+7-3Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ def switch_user(self, username, password):
249249
self._username = username
250250
self._password = password
251251

252-
def request(self, url, method='GET', params=None, data=None,
252+
def request(self, url, method='GET', params=None, data=None, stream=False,
253253
expected_response_code=200, headers=None):
254254
"""Make a HTTP request to the InfluxDB API.
255255
@@ -261,6 +261,8 @@ def request(self, url, method='GET', params=None, data=None,
261261
:type params: dict
262262
:param data: the data of the request, defaults to None
263263
:type data: str
264+
:param stream: True if a query uses chunked responses
265+
:type stream: bool
264266
:param expected_response_code: the expected response code of
265267
the request, defaults to 200
266268
:type expected_response_code: int
@@ -312,6 +314,7 @@ def request(self, url, method='GET', params=None, data=None,
312314
auth=(self._username, self._password),
313315
params=params,
314316
data=data,
317+
stream=stream,
315318
headers=headers,
316319
proxies=self._proxies,
317320
verify=self._verify_ssl,
@@ -398,17 +401,17 @@ def write(self, data, params=None, expected_response_code=204,
398401

399402
@staticmethod
400403
def _read_chunked_response(response, raise_errors=True):
401-
result_set = {}
402404
for line in response.iter_lines():
403405
if isinstance(line, bytes):
404406
line = line.decode('utf-8')
405407
data = json.loads(line)
408+
result_set = {}
406409
for result in data.get('results', []):
407410
for _key in result:
408411
if isinstance(result[_key], list):
409412
result_set.setdefault(
410413
_key, []).extend(result[_key])
411-
return ResultSet(result_set, raise_errors=raise_errors)
414+
yield ResultSet(result_set, raise_errors=raise_errors)
412415

413416
def query(self,
414417
query,
@@ -499,6 +502,7 @@ def query(self,
499502
method=method,
500503
params=params,
501504
data=None,
505+
stream=chunked,
502506
expected_response_code=expected_response_code
503507
)
504508

‎influxdb/tests/client_test.py

Copy file name to clipboardExpand all lines: influxdb/tests/client_test.py
+18-26Lines changed: 18 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1400,40 +1400,32 @@ def test_invalid_port_fails(self):
14001400
def test_chunked_response(self):
14011401
"""Test chunked reponse for TestInfluxDBClient object."""
14021402
example_response = \
1403-
u'{"results":[{"statement_id":0,"series":' \
1404-
'[{"name":"cpu","columns":["fieldKey","fieldType"],"values":' \
1405-
'[["value","integer"]]}],"partial":true}]}\n{"results":' \
1406-
'[{"statement_id":0,"series":[{"name":"iops","columns":' \
1407-
'["fieldKey","fieldType"],"values":[["value","integer"]]}],' \
1408-
'"partial":true}]}\n{"results":[{"statement_id":0,"series":' \
1409-
'[{"name":"load","columns":["fieldKey","fieldType"],"values":' \
1410-
'[["value","integer"]]}],"partial":true}]}\n{"results":' \
1411-
'[{"statement_id":0,"series":[{"name":"memory","columns":' \
1412-
'["fieldKey","fieldType"],"values":[["value","integer"]]}]}]}\n'
1403+
u'{"results":[{"statement_id":0,"series":[{"columns":["key"],' \
1404+
'"values":[["cpu"],["memory"],["iops"],["network"]],"partial":' \
1405+
'true}],"partial":true}]}\n{"results":[{"statement_id":0,' \
1406+
'"series":[{"columns":["key"],"values":[["qps"],["uptime"],' \
1407+
'["df"],["mount"]]}]}]}\n'
14131408

14141409
with requests_mock.Mocker() as m:
14151410
m.register_uri(
14161411
requests_mock.GET,
14171412
"http://localhost:8086/query",
14181413
text=example_response
14191414
)
1420-
response = self.cli.query('show series limit 4 offset 0',
1415+
response = self.cli.query('show series',
14211416
chunked=True, chunk_size=4)
1422-
self.assertTrue(len(response) == 4)
1423-
self.assertEqual(response.__repr__(), ResultSet(
1424-
{'series': [{'values': [['value', 'integer']],
1425-
'name': 'cpu',
1426-
'columns': ['fieldKey', 'fieldType']},
1427-
{'values': [['value', 'integer']],
1428-
'name': 'iops',
1429-
'columns': ['fieldKey', 'fieldType']},
1430-
{'values': [['value', 'integer']],
1431-
'name': 'load',
1432-
'columns': ['fieldKey', 'fieldType']},
1433-
{'values': [['value', 'integer']],
1434-
'name': 'memory',
1435-
'columns': ['fieldKey', 'fieldType']}]}
1436-
).__repr__())
1417+
res = list(response)
1418+
self.assertTrue(len(res) == 2)
1419+
self.assertEqual(res[0].__repr__(), ResultSet(
1420+
{'series': [{
1421+
'columns': ['key'],
1422+
'values': [['cpu'], ['memory'], ['iops'], ['network']]
1423+
}]}).__repr__())
1424+
self.assertEqual(res[1].__repr__(), ResultSet(
1425+
{'series': [{
1426+
'columns': ['key'],
1427+
'values': [['qps'], ['uptime'], ['df'], ['mount']]
1428+
}]}).__repr__())
14371429

14381430

14391431
class FakeClient(InfluxDBClient):

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.