Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 02ddf0e

Browse filesBrowse files
committed
Add time_precision support to line protocol.
1 parent e34acc1 commit 02ddf0e
Copy full SHA for 02ddf0e

File tree

5 files changed

+80
-43
lines changed
Filter options

5 files changed

+80
-43
lines changed

‎influxdb/_dataframe_client.py

Copy file name to clipboardExpand all lines: influxdb/_dataframe_client.py
+12-5Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,15 +55,14 @@ def write_points(self, dataframe, measurement, tags=None,
5555
end_index = (batch + 1) * batch_size
5656
points = self._convert_dataframe_to_json(
5757
dataframe.ix[start_index:end_index].copy(),
58-
measurement,
59-
tags
58+
measurement, tags, time_precision
6059
)
6160
super(DataFrameClient, self).write_points(
6261
points, time_precision, database, retention_policy)
6362
return True
6463
else:
6564
points = self._convert_dataframe_to_json(
66-
dataframe, measurement, tags
65+
dataframe, measurement, tags, time_precision
6766
)
6867
super(DataFrameClient, self).write_points(
6968
points, time_precision, database, retention_policy)
@@ -116,7 +115,8 @@ def _to_dataframe(self, rs):
116115
result[key] = df
117116
return result
118117

119-
def _convert_dataframe_to_json(self, dataframe, measurement, tags=None):
118+
def _convert_dataframe_to_json(self, dataframe, measurement, tags=None,
119+
time_precision=None):
120120

121121
if not isinstance(dataframe, pd.DataFrame):
122122
raise TypeError('Must be DataFrame, but type was: {}.'
@@ -136,11 +136,18 @@ def _convert_dataframe_to_json(self, dataframe, measurement, tags=None):
136136
# Convert dtype for json serialization
137137
dataframe = dataframe.astype('object')
138138

139+
precision_factor = {
140+
"n": 1,
141+
"u": 1e3,
142+
"ms": 1e6,
143+
"s": 1e9
144+
}.get(time_precision, 1)
145+
139146
points = [
140147
{'measurement': measurement,
141148
'tags': tags if tags else {},
142149
'fields': rec,
143-
'time': ts.value
150+
'time': int(ts.value / precision_factor)
144151
}
145152
for ts, rec in zip(dataframe.index, dataframe.to_dict('record'))]
146153
return points

‎influxdb/client.py

Copy file name to clipboardExpand all lines: influxdb/client.py
+6-1Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,11 +259,16 @@ def write(self, data, params=None, expected_response_code=204):
259259
headers = self._headers
260260
headers['Content-type'] = 'application/octet-stream'
261261

262+
if params:
263+
precision = params.get('precision')
264+
else:
265+
precision = None
266+
262267
self.request(
263268
url="write",
264269
method='POST',
265270
params=params,
266-
data=make_lines(data).encode('utf-8'),
271+
data=make_lines(data, precision).encode('utf-8'),
267272
expected_response_code=expected_response_code,
268273
headers=headers
269274
)

‎influxdb/line_protocol.py

Copy file name to clipboardExpand all lines: influxdb/line_protocol.py
+15-10Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,23 +6,28 @@
66
from datetime import datetime
77

88
from dateutil.parser import parse
9-
from pytz import utc
109
from six import binary_type, text_type
1110

1211

13-
def _convert_timestamp(timestamp):
12+
def _convert_timestamp(timestamp, precision=None):
1413
if isinstance(timestamp, int):
15-
return timestamp
14+
return timestamp # assume precision is correct if timestamp is int
1615
if isinstance(_force_text(timestamp), text_type):
1716
timestamp = parse(timestamp)
1817
if isinstance(timestamp, datetime):
19-
if timestamp.tzinfo:
20-
timestamp = timestamp.astimezone(utc)
21-
timestamp.replace(tzinfo=None)
22-
return (
23-
timegm(timestamp.timetuple()) * 1e9 +
18+
ns = (
19+
timegm(timestamp.utctimetuple()) * 1e9 +
2420
timestamp.microsecond * 1e3
2521
)
22+
if precision is None or precision == 'n':
23+
return ns
24+
elif precision == 'u':
25+
return ns / 1e3
26+
elif precision == 'ms':
27+
return ns / 1e6
28+
elif precision == 's':
29+
return ns / 1e9
30+
2631
raise ValueError(timestamp)
2732

2833

@@ -58,7 +63,7 @@ def _force_text(data):
5863
return data
5964

6065

61-
def make_lines(data):
66+
def make_lines(data, precision=None):
6267
"""
6368
Extracts the points from the given dict and returns a Unicode string
6469
matching the line protocol introduced in InfluxDB 0.9.0.
@@ -103,7 +108,7 @@ def make_lines(data):
103108
# add timestamp
104109
if 'time' in point:
105110
timestamp = _force_text(str(int(
106-
_convert_timestamp(point['time'])
111+
_convert_timestamp(point['time'], precision)
107112
)))
108113
elements.append(timestamp)
109114

‎influxdb/tests/client_test.py

Copy file name to clipboardExpand all lines: influxdb/tests/client_test.py
+22-4Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -275,17 +275,35 @@ def test_write_points_with_precision(self):
275275
)
276276

277277
cli = InfluxDBClient(database='db')
278-
cli.write_points(
279-
self.dummy_points,
280-
time_precision='n'
281-
)
282278

279+
cli.write_points(self.dummy_points, time_precision='n')
283280
self.assertEqual(
284281
b"cpu_load_short,host=server01,region=us-west "
285282
b"value=0.64 1257894000000000000\n",
286283
m.last_request.body,
287284
)
288285

286+
cli.write_points(self.dummy_points, time_precision='u')
287+
self.assertEqual(
288+
b"cpu_load_short,host=server01,region=us-west "
289+
b"value=0.64 1257894000000000\n",
290+
m.last_request.body,
291+
)
292+
293+
cli.write_points(self.dummy_points, time_precision='ms')
294+
self.assertEqual(
295+
b"cpu_load_short,host=server01,region=us-west "
296+
b"value=0.64 1257894000000\n",
297+
m.last_request.body,
298+
)
299+
300+
cli.write_points(self.dummy_points, time_precision='s')
301+
self.assertEqual(
302+
b"cpu_load_short,host=server01,region=us-west "
303+
b"value=0.64 1257894000\n",
304+
m.last_request.body,
305+
)
306+
289307
def test_write_points_bad_precision(self):
290308
cli = InfluxDBClient()
291309
with self.assertRaisesRegexp(

‎influxdb/tests/dataframe_client_test.py

Copy file name to clipboardExpand all lines: influxdb/tests/dataframe_client_test.py
+25-23Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -119,38 +119,40 @@ def test_write_points_from_dataframe_with_time_precision(self):
119119
"http://localhost:8086/write",
120120
status_code=204)
121121

122-
points = {
123-
'database': 'db',
124-
'points': [
125-
{'time': '1970-01-01T00:00:00+00:00',
126-
'fields': {
127-
'column_one': '1',
128-
'column_three': 1.0,
129-
'column_two': 1},
130-
'tags': {},
131-
'measurement': 'foo'},
132-
{'time': '1970-01-01T01:00:00+00:00',
133-
'fields': {
134-
'column_one': '2',
135-
'column_three': 2.0,
136-
'column_two': 2},
137-
'tags': {},
138-
'measurement': 'foo'}]
139-
}
140-
141122
cli = DataFrameClient(database='db')
142123
measurement = "foo"
143124

144125
cli.write_points(dataframe, measurement, time_precision='s')
145126
self.assertEqual(m.last_request.qs['precision'], ['s'])
127+
self.assertEqual(
128+
b'foo column_one="1",column_three=1.0,column_two=1 0\nfoo '
129+
b'column_one="2",column_three=2.0,column_two=2 3600\n',
130+
m.last_request.body,
131+
)
146132

147-
cli.write_points(dataframe, measurement, time_precision='m')
148-
points.update(precision='m')
149-
self.assertEqual(m.last_request.qs['precision'], ['m'])
133+
cli.write_points(dataframe, measurement, time_precision='ms')
134+
self.assertEqual(m.last_request.qs['precision'], ['ms'])
135+
self.assertEqual(
136+
b'foo column_one="1",column_three=1.0,column_two=1 0\nfoo '
137+
b'column_one="2",column_three=2.0,column_two=2 3600000\n',
138+
m.last_request.body,
139+
)
150140

151141
cli.write_points(dataframe, measurement, time_precision='u')
152-
points.update(precision='u')
153142
self.assertEqual(m.last_request.qs['precision'], ['u'])
143+
self.assertEqual(
144+
b'foo column_one="1",column_three=1.0,column_two=1 0\nfoo '
145+
b'column_one="2",column_three=2.0,column_two=2 3600000000\n',
146+
m.last_request.body,
147+
)
148+
149+
cli.write_points(dataframe, measurement, time_precision='n')
150+
self.assertEqual(m.last_request.qs['precision'], ['n'])
151+
self.assertEqual(
152+
b'foo column_one="1",column_three=1.0,column_two=1 0\nfoo '
153+
b'column_one="2",column_three=2.0,column_two=2 3600000000000\n',
154+
m.last_request.body,
155+
)
154156

155157
@raises(TypeError)
156158
def test_write_points_from_dataframe_fails_without_time_index(self):

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.