Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
This repository was archived by the owner on Oct 29, 2024. It is now read-only.

Commit fc0235e

Browse filesBrowse files
authored
Merge pull request #827 from rolincova/fix/data-frame-client-custom-indexes
Fix: add support for custom indexes for query in the DataFrameClient (#785)
2 parents 95e0efb + cb3156c commit fc0235e
Copy full SHA for fc0235e

File tree

6 files changed

+57
-13
lines changed
Filter options

6 files changed

+57
-13
lines changed

‎CHANGELOG.md

Copy file name to clipboardExpand all lines: CHANGELOG.md
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
88

99
### Added
1010
- Add support for custom headers in the InfluxDBClient (#710 thx @nathanielatom)
11+
- Add support for custom indexes for query in the DataFrameClient (#785)
1112

1213
### Changed
1314
- Amend retry to avoid sleep after last retry before raising exception (#790 thx @krzysbaranski)

‎influxdb/_dataframe_client.py

Copy file name to clipboardExpand all lines: influxdb/_dataframe_client.py
+16-7Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,8 @@ def query(self,
152152
chunked=False,
153153
chunk_size=0,
154154
method="GET",
155-
dropna=True):
155+
dropna=True,
156+
data_frame_index=None):
156157
"""
157158
Query data into a DataFrame.
158159
@@ -181,6 +182,8 @@ def query(self,
181182
containing all results within that chunk
182183
:param chunk_size: Size of each chunk to tell InfluxDB to use.
183184
:param dropna: drop columns where all values are missing
185+
:param data_frame_index: the list of columns that
186+
are used as DataFrame index
184187
:returns: the queried data
185188
:rtype: :class:`~.ResultSet`
186189
"""
@@ -196,13 +199,14 @@ def query(self,
196199
results = super(DataFrameClient, self).query(query, **query_args)
197200
if query.strip().upper().startswith("SELECT"):
198201
if len(results) > 0:
199-
return self._to_dataframe(results, dropna)
202+
return self._to_dataframe(results, dropna,
203+
data_frame_index=data_frame_index)
200204
else:
201205
return {}
202206
else:
203207
return results
204208

205-
def _to_dataframe(self, rs, dropna=True):
209+
def _to_dataframe(self, rs, dropna=True, data_frame_index=None):
206210
result = defaultdict(list)
207211
if isinstance(rs, list):
208212
return map(self._to_dataframe, rs,
@@ -216,10 +220,15 @@ def _to_dataframe(self, rs, dropna=True):
216220
key = (name, tuple(sorted(tags.items())))
217221
df = pd.DataFrame(data)
218222
df.time = pd.to_datetime(df.time)
219-
df.set_index('time', inplace=True)
220-
if df.index.tzinfo is None:
221-
df.index = df.index.tz_localize('UTC')
222-
df.index.name = None
223+
224+
if data_frame_index:
225+
df.set_index(data_frame_index, inplace=True)
226+
else:
227+
df.set_index('time', inplace=True)
228+
if df.index.tzinfo is None:
229+
df.index = df.index.tz_localize('UTC')
230+
df.index.name = None
231+
223232
result[key].append(df)
224233
for key, data in result.items():
225234
df = pd.concat(data).sort_index()

‎influxdb/client.py

Copy file name to clipboardExpand all lines: influxdb/client.py
+2-2Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -860,7 +860,7 @@ def alter_retention_policy(self, name, database=None,
860860
query_string = (
861861
"ALTER RETENTION POLICY {0} ON {1}"
862862
).format(quote_ident(name),
863-
quote_ident(database or self._database), shard_duration)
863+
quote_ident(database or self._database))
864864
if duration:
865865
query_string += " DURATION {0}".format(duration)
866866
if shard_duration:
@@ -958,7 +958,7 @@ def drop_user(self, username):
958958
:param username: the username to drop
959959
:type username: str
960960
"""
961-
text = "DROP USER {0}".format(quote_ident(username), method="POST")
961+
text = "DROP USER {0}".format(quote_ident(username))
962962
self.query(text, method="POST")
963963

964964
def set_user_password(self, username, password):

‎influxdb/helper.py

Copy file name to clipboardExpand all lines: influxdb/helper.py
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ def __new__(cls, *args, **kwargs):
8282
allowed_time_precisions = ['h', 'm', 's', 'ms', 'u', 'ns', None]
8383
if cls._time_precision not in allowed_time_precisions:
8484
raise AttributeError(
85-
'In {0}, time_precision is set, but invalid use any of {}.'
85+
'In {}, time_precision is set, but invalid use any of {}.'
8686
.format(cls.__name__, ','.join(allowed_time_precisions)))
8787

8888
cls._retention_policy = getattr(_meta, 'retention_policy', None)

‎influxdb/influxdb08/client.py

Copy file name to clipboardExpand all lines: influxdb/influxdb08/client.py
+3-3Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -292,10 +292,10 @@ def write_points(self, data, time_precision='s', *args, **kwargs):
292292
:type batch_size: int
293293
294294
"""
295-
def list_chunks(l, n):
295+
def list_chunks(data_list, n):
296296
"""Yield successive n-sized chunks from l."""
297-
for i in xrange(0, len(l), n):
298-
yield l[i:i + n]
297+
for i in xrange(0, len(data_list), n):
298+
yield data_list[i:i + n]
299299

300300
batch_size = kwargs.get('batch_size')
301301
if batch_size and batch_size > 0:

‎influxdb/tests/dataframe_client_test.py

Copy file name to clipboardExpand all lines: influxdb/tests/dataframe_client_test.py
+34Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1240,3 +1240,37 @@ def test_write_points_from_dataframe_with_tags_and_nan_json(self):
12401240
cli.write_points(dataframe, 'foo', tags=None, protocol='json',
12411241
tag_columns=['tag_one', 'tag_two'])
12421242
self.assertEqual(m.last_request.body, expected)
1243+
1244+
def test_query_custom_index(self):
1245+
"""Test query with custom indexes."""
1246+
data = {
1247+
"results": [
1248+
{
1249+
"series": [
1250+
{
1251+
"name": "cpu_load_short",
1252+
"columns": ["time", "value", "host"],
1253+
"values": [
1254+
[1, 0.55, "local"],
1255+
[2, 23422, "local"],
1256+
[3, 0.64, "local"]
1257+
]
1258+
}
1259+
]
1260+
}
1261+
]
1262+
}
1263+
1264+
cli = DataFrameClient('host', 8086, 'username', 'password', 'db')
1265+
iql = "SELECT value FROM cpu_load_short WHERE region=$region;" \
1266+
"SELECT count(value) FROM cpu_load_short WHERE region=$region"
1267+
bind_params = {'region': 'us-west'}
1268+
with _mocked_session(cli, 'GET', 200, data):
1269+
result = cli.query(iql, bind_params=bind_params,
1270+
data_frame_index=["time", "host"])
1271+
1272+
_data_frame = result['cpu_load_short']
1273+
print(_data_frame)
1274+
1275+
self.assertListEqual(["time", "host"],
1276+
list(_data_frame.index.names))

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.