Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 922477f

Browse filesBrowse files
authored
feat: add MultiprocessingWriter to help user write data in independent OS process (influxdata#356)
1 parent 929ffc9 commit 922477f
Copy full SHA for 922477f

File tree

5 files changed

+296
-1
lines changed
Filter options

5 files changed

+296
-1
lines changed

‎CHANGELOG.md

Copy file name to clipboardExpand all lines: CHANGELOG.md
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
- `BucketsApi` - add possibility to: `update`
66
- `OrganizationsApi` - add possibility to: `update`
77
- `UsersApi` - add possibility to: `update`, `delete`, `find`
8+
1. [#356](https://github.com/influxdata/influxdb-client-python/pull/356): Add `MultiprocessingWriter` to write data in independent OS process
89

910
### Bug Fixes
1011
1. [#359](https://github.com/influxdata/influxdb-client-python/pull/359): Correct serialization empty columns into LineProtocol [DataFrame]

‎docs/api.rst

Copy file name to clipboardExpand all lines: docs/api.rst
+12Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,15 @@ DeleteApi
6969

7070
.. autoclass:: influxdb_client.domain.DeletePredicateRequest
7171
:members:
72+
73+
Helpers
74+
"""""""
75+
.. autoclass:: influxdb_client.client.util.date_utils.DateHelper
76+
:members:
77+
78+
.. autoclass:: influxdb_client.client.util.date_utils_pandas.PandasDateTimeHelper
79+
:members:
80+
81+
.. autoclass:: influxdb_client.client.util.multiprocessing_helper.MultiprocessingWriter
82+
:members:
83+
+205Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
"""
2+
Helpers classes to make easier use the client in multiprocessing environment.
3+
4+
For more information how the multiprocessing works see Python's
5+
`reference docs <https://docs.python.org/3/library/multiprocessing.html>`_.
6+
"""
7+
import logging
8+
import multiprocessing
9+
10+
from influxdb_client import InfluxDBClient, WriteOptions
11+
from influxdb_client.client.exceptions import InfluxDBError
12+
13+
logger = logging.getLogger(__name__)
14+
15+
16+
def _success_callback(conf: (str, str, str), data: str):
17+
"""Successfully writen batch."""
18+
logger.debug(f"Written batch: {conf}, data: {data}")
19+
20+
21+
def _error_callback(conf: (str, str, str), data: str, exception: InfluxDBError):
22+
"""Unsuccessfully writen batch."""
23+
logger.debug(f"Cannot write batch: {conf}, data: {data} due: {exception}")
24+
25+
26+
def _retry_callback(conf: (str, str, str), data: str, exception: InfluxDBError):
27+
"""Retryable error."""
28+
logger.debug(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")
29+
30+
31+
class _PoisonPill:
32+
"""To notify process to terminate."""
33+
34+
pass
35+
36+
37+
class MultiprocessingWriter(multiprocessing.Process):
38+
"""
39+
The Helper class to write data into InfluxDB in independent OS process.
40+
41+
Example:
42+
.. code-block:: python
43+
44+
from influxdb_client import WriteOptions
45+
from influxdb_client.client.util.multiprocessing_helper import MultiprocessingWriter
46+
47+
48+
def main():
49+
writer = MultiprocessingWriter(url="http://localhost:8086", token="my-token", org="my-org",
50+
write_options=WriteOptions(batch_size=100))
51+
writer.start()
52+
53+
for x in range(1, 1000):
54+
writer.write(bucket="my-bucket", record=f"mem,tag=a value={x}i {x}")
55+
56+
writer.__del__()
57+
58+
59+
if __name__ == '__main__':
60+
main()
61+
62+
63+
How to use with context_manager:
64+
.. code-block:: python
65+
66+
from influxdb_client import WriteOptions
67+
from influxdb_client.client.util.multiprocessing_helper import MultiprocessingWriter
68+
69+
70+
def main():
71+
with MultiprocessingWriter(url="http://localhost:8086", token="my-token", org="my-org",
72+
write_options=WriteOptions(batch_size=100)) as writer:
73+
for x in range(1, 1000):
74+
writer.write(bucket="my-bucket", record=f"mem,tag=a value={x}i {x}")
75+
76+
77+
if __name__ == '__main__':
78+
main()
79+
80+
81+
How to handle batch events:
82+
.. code-block:: python
83+
84+
from influxdb_client import WriteOptions
85+
from influxdb_client.client.exceptions import InfluxDBError
86+
from influxdb_client.client.util.multiprocessing_helper import MultiprocessingWriter
87+
88+
89+
class BatchingCallback(object):
90+
91+
def success(self, conf: (str, str, str), data: str):
92+
print(f"Written batch: {conf}, data: {data}")
93+
94+
def error(self, conf: (str, str, str), data: str, exception: InfluxDBError):
95+
print(f"Cannot write batch: {conf}, data: {data} due: {exception}")
96+
97+
def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError):
98+
print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")
99+
100+
101+
def main():
102+
callback = BatchingCallback()
103+
with MultiprocessingWriter(url="http://localhost:8086", token="my-token", org="my-org",
104+
success_callback=callback.success,
105+
error_callback=callback.error,
106+
retry_callback=callback.retry) as writer:
107+
108+
for x in range(1, 1000):
109+
writer.write(bucket="my-bucket", record=f"mem,tag=a value={x}i {x}")
110+
111+
112+
if __name__ == '__main__':
113+
main()
114+
115+
116+
"""
117+
118+
__started__ = False
119+
__disposed__ = False
120+
121+
def __init__(self, **kwargs) -> None:
122+
"""
123+
Initialize defaults.
124+
125+
For more information how to initialize the writer see the examples above.
126+
127+
:param kwargs: arguments are passed into ``__init__`` function of ``InfluxDBClient`` and ``write_api``.
128+
"""
129+
multiprocessing.Process.__init__(self)
130+
self.kwargs = kwargs
131+
self.client = None
132+
self.write_api = None
133+
self.queue_ = multiprocessing.Manager().Queue()
134+
135+
def write(self, **kwargs) -> None:
136+
"""
137+
Append time-series data into underlying queue.
138+
139+
For more information how to pass arguments see the examples above.
140+
141+
:param kwargs: arguments are passed into ``write`` function of ``WriteApi``
142+
:return: None
143+
"""
144+
assert self.__disposed__ is False, 'Cannot write data: the writer is closed.'
145+
assert self.__started__ is True, 'Cannot write data: the writer is not started.'
146+
self.queue_.put(kwargs)
147+
148+
def run(self):
149+
"""Initialize ``InfluxDBClient`` and waits for data to writes into InfluxDB."""
150+
# Initialize Client and Write API
151+
self.client = InfluxDBClient(**self.kwargs)
152+
self.write_api = self.client.write_api(write_options=self.kwargs.get('write_options', WriteOptions()),
153+
success_callback=self.kwargs.get('success_callback', _success_callback),
154+
error_callback=self.kwargs.get('error_callback', _error_callback),
155+
retry_callback=self.kwargs.get('retry_callback', _retry_callback))
156+
# Infinite loop - until poison pill
157+
while True:
158+
next_record = self.queue_.get()
159+
if type(next_record) is _PoisonPill:
160+
# Poison pill means break the loop
161+
self.terminate()
162+
self.queue_.task_done()
163+
break
164+
self.write_api.write(**next_record)
165+
self.queue_.task_done()
166+
167+
def start(self) -> None:
168+
"""Start independent process for writing data into InfluxDB."""
169+
super().start()
170+
self.__started__ = True
171+
172+
def terminate(self) -> None:
173+
"""
174+
Cleanup resources in independent process.
175+
176+
This function **cannot be used** to terminate the ``MultiprocessingWriter``.
177+
If you want to finish your writes please call: ``__del__``.
178+
"""
179+
if self.write_api:
180+
logger.info("flushing data...")
181+
self.write_api.__del__()
182+
self.write_api = None
183+
if self.client:
184+
self.client.__del__()
185+
self.client = None
186+
logger.info("closed")
187+
188+
def __enter__(self):
189+
"""Enter the runtime context related to this object."""
190+
self.start()
191+
return self
192+
193+
def __exit__(self, exc_type, exc_value, traceback):
194+
"""Exit the runtime context related to this object."""
195+
self.__del__()
196+
197+
def __del__(self):
198+
"""Dispose the client and write_api."""
199+
if self.__started__:
200+
self.queue_.put(_PoisonPill())
201+
self.queue_.join()
202+
self.join()
203+
self.queue_ = None
204+
self.__started__ = False
205+
self.__disposed__ = True

‎influxdb_client/client/write_api.py

Copy file name to clipboardExpand all lines: influxdb_client/client/write_api.py
+6-1Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -565,4 +565,9 @@ def __setstate__(self, state):
565565
"""Set your object with the provided dict."""
566566
self.__dict__.update(state)
567567
# Init Rx
568-
self.__init__(self._influxdb_client, self._write_options, self._point_settings)
568+
self.__init__(self._influxdb_client,
569+
self._write_options,
570+
self._point_settings,
571+
success_callback=self._success_callback,
572+
error_callback=self._error_callback,
573+
retry_callback=self._retry_callback)

‎tests/test_MultiprocessingWriter.py

Copy file name to clipboard
+72Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import os
2+
import unittest
3+
from datetime import datetime
4+
5+
from influxdb_client import WritePrecision, InfluxDBClient
6+
from influxdb_client.client.util.date_utils import get_date_helper
7+
from influxdb_client.client.util.multiprocessing_helper import MultiprocessingWriter
8+
from influxdb_client.client.write_api import SYNCHRONOUS
9+
10+
11+
# noinspection PyMethodMayBeStatic
12+
class MultiprocessingWriterTest(unittest.TestCase):
13+
14+
def setUp(self) -> None:
15+
self.url = os.getenv('INFLUXDB_V2_URL', "http://localhost:8086")
16+
self.token = os.getenv('INFLUXDB_V2_TOKEN', "my-token")
17+
self.org = os.getenv('INFLUXDB_V2_ORG', "my-org")
18+
self.writer = None
19+
20+
def tearDown(self) -> None:
21+
if self.writer:
22+
self.writer.__del__()
23+
24+
def test_write_without_start(self):
25+
self.writer = MultiprocessingWriter(url=self.url, token=self.token, org=self.org,
26+
write_options=SYNCHRONOUS)
27+
28+
with self.assertRaises(AssertionError) as ve:
29+
self.writer.write(bucket="my-bucket", record=f"mem,tag=a value=5")
30+
31+
self.assertEqual('Cannot write data: the writer is not started.', f'{ve.exception}')
32+
33+
def test_write_after_terminate(self):
34+
self.writer = MultiprocessingWriter(url=self.url, token=self.token, org=self.org,
35+
write_options=SYNCHRONOUS)
36+
self.writer.start()
37+
self.writer.__del__()
38+
39+
with self.assertRaises(AssertionError) as ve:
40+
self.writer.write(bucket="my-bucket", record=f"mem,tag=a value=5")
41+
42+
self.assertEqual('Cannot write data: the writer is closed.', f'{ve.exception}')
43+
44+
def test_terminate_twice(self):
45+
with MultiprocessingWriter(url=self.url, token=self.token, org=self.org, write_options=SYNCHRONOUS) as writer:
46+
writer.__del__()
47+
writer.terminate()
48+
writer.terminate()
49+
writer.__del__()
50+
51+
def test_use_context_manager(self):
52+
with MultiprocessingWriter(url=self.url, token=self.token, org=self.org, write_options=SYNCHRONOUS) as writer:
53+
self.assertIsNotNone(writer)
54+
55+
def test_pass_parameters(self):
56+
unique = get_date_helper().to_nanoseconds(datetime.utcnow() - datetime.utcfromtimestamp(0))
57+
58+
# write data
59+
with MultiprocessingWriter(url=self.url, token=self.token, org=self.org, write_options=SYNCHRONOUS) as writer:
60+
writer.write(bucket="my-bucket", record=f"mem_{unique},tag=a value=5i 10", write_precision=WritePrecision.S)
61+
62+
# query data
63+
with InfluxDBClient(url=self.url, token=self.token, org=self.org) as client:
64+
query_api = client.query_api()
65+
tables = query_api.query(
66+
f'from(bucket: "my-bucket") |> range(start: 0) |> filter(fn: (r) => r._measurement == "mem_{unique}")',
67+
self.org)
68+
record = tables[0].records[0]
69+
self.assertIsNotNone(record)
70+
self.assertEqual("a", record["tag"])
71+
self.assertEqual(5, record["_value"])
72+
self.assertEqual(get_date_helper().to_utc(datetime.utcfromtimestamp(10)), record["_time"])

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.