Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
This repository was archived by the owner on Oct 29, 2024. It is now read-only.

Commit de44289

Browse filesBrowse files
authored
Make batched writing support all iterables (#746)
* Make batched writing support all iterables * Also test batching generator against real server * Fix PEP257 error * Import itertools functions directly
1 parent d590119 commit de44289
Copy full SHA for de44289

File tree

Expand file treeCollapse file tree

3 files changed

+69
-3
lines changed
Filter options
Expand file treeCollapse file tree

3 files changed

+69
-3
lines changed

‎influxdb/client.py

Copy file name to clipboardExpand all lines: influxdb/client.py
+12-3Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,11 @@
1515
import socket
1616
import struct
1717
import time
18+
from itertools import chain, islice
1819

1920
import msgpack
2021
import requests
2122
import requests.exceptions
22-
from six.moves import xrange
2323
from six.moves.urllib.parse import urlparse
2424

2525
from influxdb.line_protocol import make_lines, quote_ident, quote_literal
@@ -597,8 +597,17 @@ def ping(self):
597597

598598
@staticmethod
599599
def _batches(iterable, size):
600-
for i in xrange(0, len(iterable), size):
601-
yield iterable[i:i + size]
600+
# Iterate over an iterable producing iterables of batches. Based on:
601+
# http://code.activestate.com/recipes/303279-getting-items-in-batches/
602+
iterator = iter(iterable)
603+
while True:
604+
try: # Try get the first element in the iterator...
605+
head = (next(iterator),)
606+
except StopIteration:
607+
return # ...so that we can stop if there isn't one
608+
# Otherwise, lazily slice the rest of the batch
609+
rest = islice(iterator, size - 1)
610+
yield chain(head, rest)
602611

603612
def _write_points(self,
604613
points,

‎influxdb/tests/client_test.py

Copy file name to clipboardExpand all lines: influxdb/tests/client_test.py
+30Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,36 @@ def test_write_points_batch(self):
332332
self.assertEqual(expected_last_body,
333333
m.last_request.body.decode('utf-8'))
334334

335+
def test_write_points_batch_generator(self):
336+
"""Test write points batch from a generator for TestInfluxDBClient."""
337+
dummy_points = [
338+
{"measurement": "cpu_usage", "tags": {"unit": "percent"},
339+
"time": "2009-11-10T23:00:00Z", "fields": {"value": 12.34}},
340+
{"measurement": "network", "tags": {"direction": "in"},
341+
"time": "2009-11-10T23:00:00Z", "fields": {"value": 123.00}},
342+
{"measurement": "network", "tags": {"direction": "out"},
343+
"time": "2009-11-10T23:00:00Z", "fields": {"value": 12.00}}
344+
]
345+
dummy_points_generator = (point for point in dummy_points)
346+
expected_last_body = (
347+
"network,direction=out,host=server01,region=us-west "
348+
"value=12.0 1257894000000000000\n"
349+
)
350+
351+
with requests_mock.Mocker() as m:
352+
m.register_uri(requests_mock.POST,
353+
"http://localhost:8086/write",
354+
status_code=204)
355+
cli = InfluxDBClient(database='db')
356+
cli.write_points(points=dummy_points_generator,
357+
database='db',
358+
tags={"host": "server01",
359+
"region": "us-west"},
360+
batch_size=2)
361+
self.assertEqual(m.call_count, 2)
362+
self.assertEqual(expected_last_body,
363+
m.last_request.body.decode('utf-8'))
364+
335365
def test_write_points_udp(self):
336366
"""Test write points UDP for TestInfluxDBClient object."""
337367
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

‎influxdb/tests/server_tests/client_test_with_server.py

Copy file name to clipboardExpand all lines: influxdb/tests/server_tests/client_test_with_server.py
+27Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,6 +452,33 @@ def test_write_points_batch(self):
452452
self.assertIn(12, net_out['series'][0]['values'][0])
453453
self.assertIn(12.34, cpu['series'][0]['values'][0])
454454

455+
def test_write_points_batch_generator(self):
456+
"""Test writing points in a batch from a generator."""
457+
dummy_points = [
458+
{"measurement": "cpu_usage", "tags": {"unit": "percent"},
459+
"time": "2009-11-10T23:00:00Z", "fields": {"value": 12.34}},
460+
{"measurement": "network", "tags": {"direction": "in"},
461+
"time": "2009-11-10T23:00:00Z", "fields": {"value": 123.00}},
462+
{"measurement": "network", "tags": {"direction": "out"},
463+
"time": "2009-11-10T23:00:00Z", "fields": {"value": 12.00}}
464+
]
465+
dummy_points_generator = (point for point in dummy_points)
466+
self.cli.write_points(points=dummy_points_generator,
467+
tags={"host": "server01",
468+
"region": "us-west"},
469+
batch_size=2)
470+
time.sleep(5)
471+
net_in = self.cli.query("SELECT value FROM network "
472+
"WHERE direction=$dir",
473+
bind_params={'dir': 'in'}
474+
).raw
475+
net_out = self.cli.query("SELECT value FROM network "
476+
"WHERE direction='out'").raw
477+
cpu = self.cli.query("SELECT value FROM cpu_usage").raw
478+
self.assertIn(123, net_in['series'][0]['values'][0])
479+
self.assertIn(12, net_out['series'][0]['values'][0])
480+
self.assertIn(12.34, cpu['series'][0]['values'][0])
481+
455482
def test_query(self):
456483
"""Test querying data back from server."""
457484
self.assertIs(True, self.cli.write_points(dummy_point))

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.