Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit a1c2615

Browse filesBrowse files
authored
fix: parsing response with new line in column [async/await] (influxdata#497)
1 parent 9e9a10c commit a1c2615
Copy full SHA for a1c2615

File tree

Expand file treeCollapse file tree

5 files changed

+64
-10
lines changed
Filter options
Expand file treeCollapse file tree

5 files changed

+64
-10
lines changed

‎CHANGELOG.md

Copy file name to clipboardExpand all lines: CHANGELOG.md
+3Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
## 1.33.0 [unreleased]
22

3+
### Bug Fixes
4+
1. [#497](https://github.com/influxdata/influxdb-client-python/pull/497): Parsing InfluxDB response with new line character in CSV column [async/await]
5+
36
## 1.32.0 [2022-08-25]
47

58
:warning: This release drop supports for Python 3.6. As of 2021-12-23, 3.6 has reached the end-of-life phase of its release cycle. 3.6.15 was the final security release. For more info see: https://peps.python.org/pep-0494/#lifespan

‎README.rst

Copy file name to clipboardExpand all lines: README.rst
+3-3Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1371,12 +1371,12 @@ How to use Asyncio
13711371
.. marker-asyncio-start
13721372
13731373
Starting from version 1.27.0 for Python 3.7+ the ``influxdb-client`` package supports ``async/await`` based on
1374-
`asyncio <https://docs.python.org/3/library/asyncio.html>`_ and `aiohttp <https://docs.aiohttp.org>`_.
1375-
You can install ``aiohttp`` directly:
1374+
`asyncio <https://docs.python.org/3/library/asyncio.html>`_, `aiohttp <https://docs.aiohttp.org>`_ and `aiocsv <https://pypi.org/project/aiocsv/>`_.
1375+
You can install ``aiohttp`` and ``aiocsv`` directly:
13761376

13771377
.. code-block:: bash
13781378
1379-
$ python -m pip install influxdb-client aiohttp
1379+
$ python -m pip install influxdb-client aiohttp aiocsv
13801380
13811381
or use the ``[async]`` extra:
13821382

‎influxdb_client/client/flux_csv_parser.py

Copy file name to clipboardExpand all lines: influxdb_client/client/flux_csv_parser.py
+13-6Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,8 @@ def __exit__(self, exc_type, exc_val, exc_tb):
9999

100100
async def __aenter__(self) -> 'FluxCsvParser':
101101
"""Initialize CSV reader."""
102-
self._reader = self._response.content
102+
from aiocsv import AsyncReader
103+
self._reader = AsyncReader(_StreamReaderToWithAsyncRead(self._response.content))
103104

104105
return self
105106

@@ -134,11 +135,9 @@ async def _parse_flux_response_async(self):
134135
metadata = _FluxCsvParserMetadata()
135136

136137
try:
137-
async for line in self._reader:
138-
csv = list(csv_parser.reader([line.decode(_UTF_8_encoding)]))
139-
if len(csv) >= 1:
140-
for val in self._parse_flux_response_row(metadata, csv[0]):
141-
yield val
138+
async for csv in self._reader:
139+
for val in self._parse_flux_response_row(metadata, csv):
140+
yield val
142141

143142
# Return latest DataFrame
144143
if (self._serialization_mode is FluxSerializationMode.dataFrame) & hasattr(self, '_data_frame'):
@@ -371,3 +370,11 @@ def _print_profiler_info(self, flux_record: FluxRecord):
371370
print(f"{name:<20}: \n\n{val}")
372371
elif val is not None:
373372
print(f"{name:<20}: {val:<20}")
373+
374+
375+
class _StreamReaderToWithAsyncRead:
376+
def __init__(self, response):
377+
self.response = response
378+
379+
async def read(self, size: int) -> str:
380+
return (await self.response.read(size)).decode(_UTF_8_encoding)

‎setup.py

Copy file name to clipboardExpand all lines: setup.py
+2-1Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@
3939
]
4040

4141
async_requires = [
42-
'aiohttp>=3.8.1'
42+
'aiohttp>=3.8.1',
43+
'aiocsv>=1.2.2'
4344
]
4445

4546
with open('README.rst', 'r') as f:

‎tests/test_InfluxDBClientAsync.py

Copy file name to clipboardExpand all lines: tests/test_InfluxDBClientAsync.py
+43Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from influxdb_client import Point, WritePrecision, BucketsService
1212
from influxdb_client.client.exceptions import InfluxDBError
1313
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
14+
from influxdb_client.client.query_api import QueryOptions
1415
from influxdb_client.client.warnings import MissingPivotFunction
1516
from tests.base_test import generate_name
1617

@@ -312,6 +313,48 @@ async def test_query_and_debug(self):
312313
results = await buckets_service.get_buckets()
313314
self.assertIn("my-bucket", list(map(lambda bucket: bucket.name, results.buckets)))
314315

316+
@async_test
317+
@aioresponses()
318+
async def test_parse_csv_with_new_lines_in_column(self, mocked):
319+
await self.client.close()
320+
self.client = InfluxDBClientAsync("http://localhost")
321+
mocked.post('http://localhost/api/v2/query?org=my-org', status=200, body='''#datatype,string,long,dateTime:RFC3339
322+
#group,false,false,false
323+
#default,_result,,
324+
,result,table,_time
325+
,,0,2022-09-09T10:22:13.744147091Z
326+
327+
#datatype,string,long,string,long,long,long,long,long,long,long,long,long,string,long,long,string
328+
#group,false,false,true,false,false,false,false,false,false,false,false,false,false,false,false,false
329+
#default,_profiler,,,,,,,,,,,,,,,
330+
,result,table,_measurement,TotalDuration,CompileDuration,QueueDuration,PlanDuration,RequeueDuration,ExecuteDuration,Concurrency,MaxAllocated,TotalAllocated,RuntimeErrors,influxdb/scanned-bytes,influxdb/scanned-values,flux/query-plan
331+
,,0,profiler/query,17305459,6292042,116958,0,0,10758125,0,448,0,,0,0,"digraph {
332+
""ReadRange4""
333+
""keep2""
334+
""limit3""
335+
336+
""ReadRange4"" -> ""keep2""
337+
""keep2"" -> ""limit3""
338+
}
339+
340+
"
341+
342+
#datatype,string,long,string,string,string,long,long,long,long,double
343+
#group,false,false,true,false,false,false,false,false,false,false
344+
#default,_profiler,,,,,,,,,
345+
,result,table,_measurement,Type,Label,Count,MinDuration,MaxDuration,DurationSum,MeanDuration
346+
,,1,profiler/operator,*influxdb.readFilterSource,ReadRange4,1,888209,888209,888209,888209
347+
,,1,profiler/operator,*universe.schemaMutationTransformation,keep2,4,1875,42042,64209,16052.25
348+
,,1,profiler/operator,*universe.limitTransformation,limit3,3,1333,38750,47874,15958''')
349+
350+
records = []
351+
await self.client\
352+
.query_api(QueryOptions(profilers=["operator", "query"],
353+
profiler_callback=lambda record: records.append(record))) \
354+
.query("buckets()", "my-org")
355+
356+
self.assertEqual(4, len(records))
357+
315358
async def _prepare_data(self, measurement: str):
316359
_point1 = Point(measurement).tag("location", "Prague").field("temperature", 25.3)
317360
_point2 = Point(measurement).tag("location", "New York").field("temperature", 24.3)

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.