Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 4792fd5

Browse filesBrowse files
authored
fix: add support for "with .. as .." statement for cleaner exception … (influxdata#218)
1 parent d465dda commit 4792fd5
Copy full SHA for 4792fd5

15 files changed

+504
-502
lines changed

‎README.rst

Copy file name to clipboardExpand all lines: README.rst
+63-67Lines changed: 63 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ Writes
220220
The `WriteApi <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/write_api.py>`_ supports synchronous, asynchronous and batching writes into InfluxDB 2.0.
221221
The data should be passed as a `InfluxDB Line Protocol <https://docs.influxdata.com/influxdb/latest/write_protocols/line_protocol_tutorial/>`_\ , `Data Point <https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/write/point.py>`_ or Observable stream.
222222

223-
**Important: The WriteApi in batching mode (default mode) is suppose to run as a singleton. To flush all your data you should call ``_write_client.close()`` at the end of your script.**
223+
**Important: The WriteApi in batching mode (default mode) is suppose to run as a singleton. To flush all your data you should wrap the execution using ``with client.write_api(...) as write_api:`` statement or call ``_write_client.close()`` at the end of your script.**
224224

225225
*The default instance of WriteApi use batching.*
226226

@@ -280,73 +280,69 @@ The batching is configurable by ``write_options``\ :
280280
281281
from influxdb_client import InfluxDBClient, Point, WriteOptions
282282
283-
_client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org")
284-
_write_client = _client.write_api(write_options=WriteOptions(batch_size=500,
285-
flush_interval=10_000,
286-
jitter_interval=2_000,
287-
retry_interval=5_000,
288-
max_retries=5,
289-
max_retry_delay=30_000,
290-
exponential_base=2))
283+
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as _client:
284+
285+
with _client.write_api(write_options=WriteOptions(batch_size=500,
286+
flush_interval=10_000,
287+
jitter_interval=2_000,
288+
retry_interval=5_000,
289+
max_retries=5,
290+
max_retry_delay=30_000,
291+
exponential_base=2)) as _write_client:
292+
293+
"""
294+
Write Line Protocol formatted as string
295+
"""
296+
_write_client.write("my-bucket", "my-org", "h2o_feet,location=coyote_creek water_level=1.0 1")
297+
_write_client.write("my-bucket", "my-org", ["h2o_feet,location=coyote_creek water_level=2.0 2",
298+
"h2o_feet,location=coyote_creek water_level=3.0 3"])
299+
300+
"""
301+
Write Line Protocol formatted as byte array
302+
"""
303+
_write_client.write("my-bucket", "my-org", "h2o_feet,location=coyote_creek water_level=1.0 1".encode())
304+
_write_client.write("my-bucket", "my-org", ["h2o_feet,location=coyote_creek water_level=2.0 2".encode(),
305+
"h2o_feet,location=coyote_creek water_level=3.0 3".encode()])
306+
307+
"""
308+
Write Dictionary-style object
309+
"""
310+
_write_client.write("my-bucket", "my-org", {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
311+
"fields": {"water_level": 1.0}, "time": 1})
312+
_write_client.write("my-bucket", "my-org", [{"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
313+
"fields": {"water_level": 2.0}, "time": 2},
314+
{"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
315+
"fields": {"water_level": 3.0}, "time": 3}])
316+
317+
"""
318+
Write Data Point
319+
"""
320+
_write_client.write("my-bucket", "my-org",
321+
Point("h2o_feet").tag("location", "coyote_creek").field("water_level", 4.0).time(4))
322+
_write_client.write("my-bucket", "my-org",
323+
[Point("h2o_feet").tag("location", "coyote_creek").field("water_level", 5.0).time(5),
324+
Point("h2o_feet").tag("location", "coyote_creek").field("water_level", 6.0).time(6)])
325+
326+
"""
327+
Write Observable stream
328+
"""
329+
_data = rx \
330+
.range(7, 11) \
331+
.pipe(ops.map(lambda i: "h2o_feet,location=coyote_creek water_level={0}.0 {0}".format(i)))
332+
333+
_write_client.write("my-bucket", "my-org", _data)
334+
335+
"""
336+
Write Pandas DataFrame
337+
"""
338+
_now = datetime.now(UTC)
339+
_data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0]],
340+
index=[_now, _now + timedelta(hours=1)],
341+
columns=["location", "water_level"])
342+
343+
_write_client.write("my-bucket", "my-org", record=_data_frame, data_frame_measurement_name='h2o_feet',
344+
data_frame_tag_columns=['location'])
291345
292-
"""
293-
Write Line Protocol formatted as string
294-
"""
295-
_write_client.write("my-bucket", "my-org", "h2o_feet,location=coyote_creek water_level=1.0 1")
296-
_write_client.write("my-bucket", "my-org", ["h2o_feet,location=coyote_creek water_level=2.0 2",
297-
"h2o_feet,location=coyote_creek water_level=3.0 3"])
298-
299-
"""
300-
Write Line Protocol formatted as byte array
301-
"""
302-
_write_client.write("my-bucket", "my-org", "h2o_feet,location=coyote_creek water_level=1.0 1".encode())
303-
_write_client.write("my-bucket", "my-org", ["h2o_feet,location=coyote_creek water_level=2.0 2".encode(),
304-
"h2o_feet,location=coyote_creek water_level=3.0 3".encode()])
305-
306-
"""
307-
Write Dictionary-style object
308-
"""
309-
_write_client.write("my-bucket", "my-org", {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
310-
"fields": {"water_level": 1.0}, "time": 1})
311-
_write_client.write("my-bucket", "my-org", [{"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
312-
"fields": {"water_level": 2.0}, "time": 2},
313-
{"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
314-
"fields": {"water_level": 3.0}, "time": 3}])
315-
316-
"""
317-
Write Data Point
318-
"""
319-
_write_client.write("my-bucket", "my-org",
320-
Point("h2o_feet").tag("location", "coyote_creek").field("water_level", 4.0).time(4))
321-
_write_client.write("my-bucket", "my-org",
322-
[Point("h2o_feet").tag("location", "coyote_creek").field("water_level", 5.0).time(5),
323-
Point("h2o_feet").tag("location", "coyote_creek").field("water_level", 6.0).time(6)])
324-
325-
"""
326-
Write Observable stream
327-
"""
328-
_data = rx \
329-
.range(7, 11) \
330-
.pipe(ops.map(lambda i: "h2o_feet,location=coyote_creek water_level={0}.0 {0}".format(i)))
331-
332-
_write_client.write("my-bucket", "my-org", _data)
333-
334-
"""
335-
Write Pandas DataFrame
336-
"""
337-
_now = datetime.now(UTC)
338-
_data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0]],
339-
index=[_now, _now + timedelta(hours=1)],
340-
columns=["location", "water_level"])
341-
342-
_write_client.write("my-bucket", "my-org", record=_data_frame, data_frame_measurement_name='h2o_feet',
343-
data_frame_tag_columns=['location'])
344-
345-
"""
346-
Close client
347-
"""
348-
_write_client.close()
349-
_client.close()
350346
351347
352348
Default Tags

‎examples/buckets_management.py

Copy file name to clipboardExpand all lines: examples/buckets_management.py
+34-35Lines changed: 34 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -10,39 +10,38 @@
1010
url = "http://localhost:8086"
1111
token = "my-token"
1212

13-
client = InfluxDBClient(url=url, token=token)
14-
buckets_api = client.buckets_api()
13+
with InfluxDBClient(url=url, token=token) as client:
14+
buckets_api = client.buckets_api()
15+
16+
"""
17+
The Bucket API uses as a parameter the Organization ID. We have to retrieve ID by Organization API.
18+
"""
19+
org_name = "my-org"
20+
org = list(filter(lambda it: it.name == org_name, client.organizations_api().find_organizations()))[0]
21+
22+
"""
23+
Create Bucket with retention policy set to 3600 seconds and name "bucket-by-python"
24+
"""
25+
print(f"------- Create -------\n")
26+
retention_rules = BucketRetentionRules(type="expire", every_seconds=3600)
27+
created_bucket = buckets_api.create_bucket(bucket_name="bucket-by-python",
28+
retention_rules=retention_rules,
29+
org_id=org.id)
30+
print(created_bucket)
31+
32+
"""
33+
List all Buckets
34+
"""
35+
print(f"\n------- List -------\n")
36+
buckets = buckets_api.find_buckets().buckets
37+
print("\n".join([f" ---\n ID: {bucket.id}\n Name: {bucket.name}\n Retention: {bucket.retention_rules}"
38+
for bucket in buckets]))
39+
print("---")
40+
41+
"""
42+
Delete previously created bucket
43+
"""
44+
print(f"------- Delete -------\n")
45+
buckets_api.delete_bucket(created_bucket)
46+
print(f" successfully deleted bucket: {created_bucket.name}")
1547

16-
"""
17-
The Bucket API uses as a parameter the Organization ID. We have to retrieve ID by Organization API.
18-
"""
19-
org_name = "my-org"
20-
org = list(filter(lambda it: it.name == org_name, client.organizations_api().find_organizations()))[0]
21-
22-
"""
23-
Create Bucket with retention policy set to 3600 seconds and name "bucket-by-python"
24-
"""
25-
print(f"------- Create -------\n")
26-
retention_rules = BucketRetentionRules(type="expire", every_seconds=3600)
27-
created_bucket = buckets_api.create_bucket(bucket_name="bucket-by-python",
28-
retention_rules=retention_rules,
29-
org_id=org.id)
30-
print(created_bucket)
31-
32-
"""
33-
List all Buckets
34-
"""
35-
print(f"\n------- List -------\n")
36-
buckets = buckets_api.find_buckets().buckets
37-
print("\n".join([f" ---\n ID: {bucket.id}\n Name: {bucket.name}\n Retention: {bucket.retention_rules}"
38-
for bucket in buckets]))
39-
print("---")
40-
41-
"""
42-
Delete previously created bucket
43-
"""
44-
print(f"------- Delete -------\n")
45-
buckets_api.delete_bucket(created_bucket)
46-
print(f" successfully deleted bucket: {created_bucket.name}")
47-
48-
client.close()

‎examples/example.py

Copy file name to clipboardExpand all lines: examples/example.py
+34-36Lines changed: 34 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -4,39 +4,37 @@
44
from influxdb_client import WritePrecision, InfluxDBClient, Point
55
from influxdb_client.client.write_api import SYNCHRONOUS
66

7-
bucket = "my-bucket"
8-
9-
client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org")
10-
11-
write_api = client.write_api(write_options=SYNCHRONOUS)
12-
query_api = client.query_api()
13-
14-
p = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3).time(datetime.now(), WritePrecision.MS)
15-
16-
# write using point structure
17-
write_api.write(bucket=bucket, record=p)
18-
19-
line_protocol = p.to_line_protocol()
20-
print(line_protocol)
21-
22-
# write using line protocol string
23-
write_api.write(bucket=bucket, record=line_protocol)
24-
25-
# using Table structure
26-
tables = query_api.query('from(bucket:"my-bucket") |> range(start: -1m)')
27-
for table in tables:
28-
print(table)
29-
for record in table.records:
30-
# process record
31-
print(record.values)
32-
33-
# using csv library
34-
csv_result = query_api.query_csv('from(bucket:"my-bucket") |> range(start: -10m)')
35-
val_count = 0
36-
for record in csv_result:
37-
for cell in record:
38-
val_count += 1
39-
print("val count: ", val_count)
40-
41-
response = query_api.query_raw('from(bucket:"my-bucket") |> range(start: -10m)')
42-
print (codecs.decode(response.data))
7+
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=False) as client:
8+
query_api = client.query_api()
9+
10+
p = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3).time(datetime.utcnow(),
11+
WritePrecision.MS)
12+
write_api = client.write_api(write_options=SYNCHRONOUS)
13+
14+
# write using point structure
15+
write_api.write(bucket="my-bucket", record=p)
16+
17+
line_protocol = p.to_line_protocol()
18+
print(line_protocol)
19+
20+
# write using line protocol string
21+
write_api.write(bucket="my-bucket", record=line_protocol)
22+
23+
# using Table structure
24+
tables = query_api.query('from(bucket:"my-bucket") |> range(start: -10m)')
25+
for table in tables:
26+
print(table)
27+
for record in table.records:
28+
# process record
29+
print(record.values)
30+
31+
# using csv library
32+
csv_result = query_api.query_csv('from(bucket:"my-bucket") |> range(start: -10m)')
33+
val_count = 0
34+
for record in csv_result:
35+
for cell in record:
36+
val_count += 1
37+
print("val count: ", val_count)
38+
39+
response = query_api.query_raw('from(bucket:"my-bucket") |> range(start: -10m)')
40+
print (codecs.decode(response.data))

‎examples/import_data_set.py

Copy file name to clipboardExpand all lines: examples/import_data_set.py
+26-32Lines changed: 26 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -60,39 +60,33 @@ def parse_row(row: OrderedDict):
6060
.from_iterable(DictReader(open('vix-daily.csv', 'r'))) \
6161
.pipe(ops.map(lambda row: parse_row(row)))
6262

63-
client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=True)
63+
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=True) as client:
6464

65-
"""
66-
Create client that writes data in batches with 50_000 items.
67-
"""
68-
write_api = client.write_api(write_options=WriteOptions(batch_size=50_000, flush_interval=10_000))
69-
70-
"""
71-
Write data into InfluxDB
72-
"""
73-
write_api.write(bucket="my-bucket", record=data)
74-
write_api.close()
65+
"""
66+
Create client that writes data in batches with 50_000 items.
67+
"""
68+
with client.write_api(write_options=WriteOptions(batch_size=50_000, flush_interval=10_000)) as write_api:
7569

76-
"""
77-
Querying max value of CBOE Volatility Index
78-
"""
79-
query = 'from(bucket:"my-bucket")' \
80-
' |> range(start: 0, stop: now())' \
81-
' |> filter(fn: (r) => r._measurement == "financial-analysis")' \
82-
' |> max()'
83-
result = client.query_api().query(query=query)
70+
"""
71+
Write data into InfluxDB
72+
"""
73+
write_api.write(bucket="my-bucket", record=data)
8474

85-
"""
86-
Processing results
87-
"""
88-
print()
89-
print("=== results ===")
90-
print()
91-
for table in result:
92-
for record in table.records:
93-
print('max {0:5} = {1}'.format(record.get_field(), record.get_value()))
75+
"""
76+
Querying max value of CBOE Volatility Index
77+
"""
78+
query = 'from(bucket:"my-bucket")' \
79+
' |> range(start: 0, stop: now())' \
80+
' |> filter(fn: (r) => r._measurement == "financial-analysis")' \
81+
' |> max()'
82+
result = client.query_api().query(query=query)
9483

95-
"""
96-
Close client
97-
"""
98-
client.close()
84+
"""
85+
Processing results
86+
"""
87+
print()
88+
print("=== results ===")
89+
print()
90+
for table in result:
91+
for record in table.records:
92+
print('max {0:5} = {1}'.format(record.get_field(), record.get_value()))

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.