Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
This repository was archived by the owner on Oct 29, 2024. It is now read-only.

Commit 7debaca

Browse filesBrowse files
gakeraviau
authored andcommitted
Merge pull request #354 from gaker/remove-cluster-client (Thanks @gaker!)
Remove cluster client
1 parent 9a2caa8 commit 7debaca
Copy full SHA for 7debaca

File tree

5 files changed

+1
-319
lines changed
Filter options

5 files changed

+1
-319
lines changed

‎README.rst

Copy file name to clipboardExpand all lines: README.rst
-14Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -108,20 +108,6 @@ Here's a basic example (for more see the examples directory)::
108108

109109
>>> print("Result: {0}".format(result))
110110

111-
If you want to connect to a cluster, you could initialize a ``InfluxDBClusterClient``::
112-
113-
$ python
114-
115-
>>> from influxdb import InfluxDBClusterClient
116-
117-
>>> cc = InfluxDBClusterClient(hosts = [('192.168.0.1', 8086),
118-
('192.168.0.2', 8086),
119-
('192.168.0.3', 8086)],
120-
username='root',
121-
password='root',
122-
database='example')
123-
124-
``InfluxDBClusterClient`` has the same methods as ``InfluxDBClient``, it basically is a proxy to multiple InfluxDBClients.
125111

126112
Testing
127113
=======

‎docs/source/api-documentation.rst

Copy file name to clipboardExpand all lines: docs/source/api-documentation.rst
-10Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,16 +45,6 @@ These clients are initiated in the same way as the
4545
:members:
4646
:undoc-members:
4747

48-
------------------------------
49-
:class:`InfluxDBClusterClient`
50-
------------------------------
51-
52-
53-
.. currentmodule:: influxdb.InfluxDBClusterClient
54-
.. autoclass:: influxdb.InfluxDBClusterClient
55-
:members:
56-
:undoc-members:
57-
5848
------------------------
5949
:class:`DataFrameClient`
6050
------------------------

‎influxdb/__init__.py

Copy file name to clipboardExpand all lines: influxdb/__init__.py
-2Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,12 @@
66
from __future__ import unicode_literals
77

88
from .client import InfluxDBClient
9-
from .client import InfluxDBClusterClient
109
from .dataframe_client import DataFrameClient
1110
from .helper import SeriesHelper
1211

1312

1413
__all__ = [
1514
'InfluxDBClient',
16-
'InfluxDBClusterClient',
1715
'DataFrameClient',
1816
'SeriesHelper',
1917
]

‎influxdb/client.py

Copy file name to clipboardExpand all lines: influxdb/client.py
-168Lines changed: 0 additions & 168 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,8 @@
77
from __future__ import print_function
88
from __future__ import unicode_literals
99

10-
from functools import wraps
1110
import json
1211
import socket
13-
import time
14-
import threading
15-
import random
1612
import requests
1713
import requests.exceptions
1814
from sys import version_info
@@ -114,8 +110,6 @@ def __init__(self,
114110
'Accept': 'text/plain'
115111
}
116112

117-
# _baseurl, _host and _port are properties to allow InfluxDBClusterClient
118-
# to override them with thread-local variables
119113
@property
120114
def _baseurl(self):
121115
return self._get_baseurl()
@@ -753,168 +747,6 @@ def send_packet(self, packet):
753747
self.udp_socket.sendto(data, (self._host, self.udp_port))
754748

755749

756-
class InfluxDBClusterClient(object):
757-
"""The :class:`~.InfluxDBClusterClient` is the client for connecting
758-
to a cluster of InfluxDB servers. Each query hits different host from the
759-
list of hosts.
760-
761-
:param hosts: all hosts to be included in the cluster, each of which
762-
should be in the format (address, port),
763-
e.g. [('127.0.0.1', 8086), ('127.0.0.1', 9096)]. Defaults to
764-
[('localhost', 8086)]
765-
:type hosts: list of tuples
766-
:param shuffle: whether the queries should hit servers evenly(randomly),
767-
defaults to True
768-
:type shuffle: bool
769-
:param client_base_class: the base class for the cluster client.
770-
This parameter is used to enable the support of different client
771-
types. Defaults to :class:`~.InfluxDBClient`
772-
:param healing_delay: the delay in seconds, counting from last failure of
773-
a server, before re-adding server to the list of working servers.
774-
Defaults to 15 minutes (900 seconds)
775-
"""
776-
777-
def __init__(self,
778-
hosts=[('localhost', 8086)],
779-
username='root',
780-
password='root',
781-
database=None,
782-
ssl=False,
783-
verify_ssl=False,
784-
timeout=None,
785-
use_udp=False,
786-
udp_port=4444,
787-
shuffle=True,
788-
client_base_class=InfluxDBClient,
789-
healing_delay=900,
790-
):
791-
self.clients = [self] # Keep it backwards compatible
792-
self.hosts = hosts
793-
self.bad_hosts = [] # Corresponding server has failures in history
794-
self.shuffle = shuffle
795-
self.healing_delay = healing_delay
796-
self._last_healing = time.time()
797-
host, port = self.hosts[0]
798-
self._hosts_lock = threading.Lock()
799-
self._thread_local = threading.local()
800-
self._client = client_base_class(host=host,
801-
port=port,
802-
username=username,
803-
password=password,
804-
database=database,
805-
ssl=ssl,
806-
verify_ssl=verify_ssl,
807-
timeout=timeout,
808-
use_udp=use_udp,
809-
udp_port=udp_port)
810-
for method in dir(client_base_class):
811-
orig_attr = getattr(client_base_class, method, '')
812-
if method.startswith('_') or not callable(orig_attr):
813-
continue
814-
815-
setattr(self, method, self._make_func(orig_attr))
816-
817-
self._client._get_host = self._get_host
818-
self._client._get_port = self._get_port
819-
self._client._get_baseurl = self._get_baseurl
820-
self._update_client_host(self.hosts[0])
821-
822-
@staticmethod
823-
def from_DSN(dsn, client_base_class=InfluxDBClient,
824-
shuffle=True, **kwargs):
825-
"""Same as :meth:`~.InfluxDBClient.from_DSN`, but supports
826-
multiple servers.
827-
828-
:param shuffle: whether the queries should hit servers
829-
evenly(randomly), defaults to True
830-
:type shuffle: bool
831-
:param client_base_class: the base class for all clients in the
832-
cluster. This parameter is used to enable the support of
833-
different client types. Defaults to :class:`~.InfluxDBClient`
834-
835-
:Example:
836-
837-
::
838-
839-
>> cluster = InfluxDBClusterClient.from_DSN('influxdb://usr:pwd\
840-
@host1:8086,usr:pwd@host2:8086/db_name', timeout=5)
841-
>> type(cluster)
842-
<class 'influxdb.client.InfluxDBClusterClient'>
843-
>> cluster.hosts
844-
[('host1', 8086), ('host2', 8086)]
845-
>> cluster._client
846-
<influxdb.client.InfluxDBClient at 0x7feb438ec950>]
847-
"""
848-
init_args = parse_dsn(dsn)
849-
init_args.update(**kwargs)
850-
init_args['shuffle'] = shuffle
851-
init_args['client_base_class'] = client_base_class
852-
cluster_client = InfluxDBClusterClient(**init_args)
853-
return cluster_client
854-
855-
def _update_client_host(self, host):
856-
self._thread_local.host, self._thread_local.port = host
857-
self._thread_local.baseurl = "{0}://{1}:{2}".format(
858-
self._client._scheme,
859-
self._client._host,
860-
self._client._port
861-
)
862-
863-
def _get_baseurl(self):
864-
return self._thread_local.baseurl
865-
866-
def _get_host(self):
867-
return self._thread_local.host
868-
869-
def _get_port(self):
870-
return self._thread_local.port
871-
872-
def _make_func(self, orig_func):
873-
874-
@wraps(orig_func)
875-
def func(*args, **kwargs):
876-
now = time.time()
877-
with self._hosts_lock:
878-
if (self.bad_hosts and
879-
self._last_healing + self.healing_delay < now):
880-
h = self.bad_hosts.pop(0)
881-
self.hosts.append(h)
882-
self._last_healing = now
883-
884-
if self.shuffle:
885-
random.shuffle(self.hosts)
886-
887-
hosts = self.hosts + self.bad_hosts
888-
889-
for h in hosts:
890-
bad_host = False
891-
try:
892-
self._update_client_host(h)
893-
return orig_func(self._client, *args, **kwargs)
894-
except InfluxDBClientError as e:
895-
# Errors caused by user's requests, re-raise
896-
raise e
897-
except ValueError as e:
898-
raise e
899-
except Exception as e:
900-
# Errors that might caused by server failure, try another
901-
bad_host = True
902-
with self._hosts_lock:
903-
if h in self.hosts:
904-
self.hosts.remove(h)
905-
self.bad_hosts.append(h)
906-
self._last_healing = now
907-
finally:
908-
with self._hosts_lock:
909-
if not bad_host and h in self.bad_hosts:
910-
self.bad_hosts.remove(h)
911-
self.hosts.append(h)
912-
913-
raise InfluxDBServerError("InfluxDB: no viable server!")
914-
915-
return func
916-
917-
918750
def parse_dsn(dsn):
919751
conn_params = urlparse(dsn)
920752
init_args = {}

‎influxdb/tests/client_test.py

Copy file name to clipboardExpand all lines: influxdb/tests/client_test.py
+1-125Lines changed: 1 addition & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import requests
2424
import requests.exceptions
2525
import socket
26-
import time
2726
import requests_mock
2827
import random
2928
from nose.tools import raises
@@ -32,8 +31,7 @@
3231
import mock
3332
import unittest
3433

35-
from influxdb import InfluxDBClient, InfluxDBClusterClient
36-
from influxdb.client import InfluxDBServerError
34+
from influxdb import InfluxDBClient
3735

3836

3937
def _build_response_object(status_code=200, content=""):
@@ -813,125 +811,3 @@ def query(self,
813811
raise Exception("Fail Twice")
814812
else:
815813
return "Success"
816-
817-
818-
class TestInfluxDBClusterClient(unittest.TestCase):
819-
820-
def setUp(self):
821-
# By default, raise exceptions on warnings
822-
warnings.simplefilter('error', FutureWarning)
823-
824-
self.hosts = [('host1', 8086), ('host2', 8086), ('host3', 8086)]
825-
self.dsn_string = 'influxdb://uSr:pWd@host1:8086,uSr:pWd@host2:8086/db'
826-
827-
def test_init(self):
828-
cluster = InfluxDBClusterClient(hosts=self.hosts,
829-
username='username',
830-
password='password',
831-
database='database',
832-
shuffle=False,
833-
client_base_class=FakeClient)
834-
self.assertEqual(3, len(cluster.hosts))
835-
self.assertEqual(0, len(cluster.bad_hosts))
836-
self.assertIn((cluster._client._host,
837-
cluster._client._port), cluster.hosts)
838-
839-
def test_one_server_fails(self):
840-
cluster = InfluxDBClusterClient(hosts=self.hosts,
841-
database='database',
842-
shuffle=False,
843-
client_base_class=FakeClient)
844-
self.assertEqual('Success', cluster.query('Fail once'))
845-
self.assertEqual(2, len(cluster.hosts))
846-
self.assertEqual(1, len(cluster.bad_hosts))
847-
848-
def test_two_servers_fail(self):
849-
cluster = InfluxDBClusterClient(hosts=self.hosts,
850-
database='database',
851-
shuffle=False,
852-
client_base_class=FakeClient)
853-
self.assertEqual('Success', cluster.query('Fail twice'))
854-
self.assertEqual(1, len(cluster.hosts))
855-
self.assertEqual(2, len(cluster.bad_hosts))
856-
857-
def test_all_fail(self):
858-
cluster = InfluxDBClusterClient(hosts=self.hosts,
859-
database='database',
860-
shuffle=True,
861-
client_base_class=FakeClient)
862-
with self.assertRaises(InfluxDBServerError):
863-
cluster.query('Fail')
864-
self.assertEqual(0, len(cluster.hosts))
865-
self.assertEqual(3, len(cluster.bad_hosts))
866-
867-
def test_all_good(self):
868-
cluster = InfluxDBClusterClient(hosts=self.hosts,
869-
database='database',
870-
shuffle=True,
871-
client_base_class=FakeClient)
872-
self.assertEqual('Success', cluster.query(''))
873-
self.assertEqual(3, len(cluster.hosts))
874-
self.assertEqual(0, len(cluster.bad_hosts))
875-
876-
def test_recovery(self):
877-
cluster = InfluxDBClusterClient(hosts=self.hosts,
878-
database='database',
879-
shuffle=True,
880-
client_base_class=FakeClient)
881-
with self.assertRaises(InfluxDBServerError):
882-
cluster.query('Fail')
883-
self.assertEqual('Success', cluster.query(''))
884-
self.assertEqual(1, len(cluster.hosts))
885-
self.assertEqual(2, len(cluster.bad_hosts))
886-
887-
def test_healing(self):
888-
cluster = InfluxDBClusterClient(hosts=self.hosts,
889-
database='database',
890-
shuffle=True,
891-
healing_delay=1,
892-
client_base_class=FakeClient)
893-
with self.assertRaises(InfluxDBServerError):
894-
cluster.query('Fail')
895-
self.assertEqual('Success', cluster.query(''))
896-
time.sleep(1.1)
897-
self.assertEqual('Success', cluster.query(''))
898-
self.assertEqual(2, len(cluster.hosts))
899-
self.assertEqual(1, len(cluster.bad_hosts))
900-
time.sleep(1.1)
901-
self.assertEqual('Success', cluster.query(''))
902-
self.assertEqual(3, len(cluster.hosts))
903-
self.assertEqual(0, len(cluster.bad_hosts))
904-
905-
def test_dsn(self):
906-
cli = InfluxDBClusterClient.from_DSN(self.dsn_string)
907-
self.assertEqual([('host1', 8086), ('host2', 8086)], cli.hosts)
908-
self.assertEqual('http://host1:8086', cli._client._baseurl)
909-
self.assertEqual('uSr', cli._client._username)
910-
self.assertEqual('pWd', cli._client._password)
911-
self.assertEqual('db', cli._client._database)
912-
self.assertFalse(cli._client.use_udp)
913-
914-
cli = InfluxDBClusterClient.from_DSN('udp+' + self.dsn_string)
915-
self.assertTrue(cli._client.use_udp)
916-
917-
cli = InfluxDBClusterClient.from_DSN('https+' + self.dsn_string)
918-
self.assertEqual('https://host1:8086', cli._client._baseurl)
919-
920-
cli = InfluxDBClusterClient.from_DSN('https+' + self.dsn_string,
921-
**{'ssl': False})
922-
self.assertEqual('http://host1:8086', cli._client._baseurl)
923-
924-
def test_dsn_password_caps(self):
925-
cli = InfluxDBClusterClient.from_DSN(
926-
'https+influxdb://usr:pWd@host:8086/db')
927-
self.assertEqual('pWd', cli._client._password)
928-
929-
def test_dsn_mixed_scheme_case(self):
930-
cli = InfluxDBClusterClient.from_DSN(
931-
'hTTps+inFLUxdb://usr:pWd@host:8086/db')
932-
self.assertEqual('pWd', cli._client._password)
933-
self.assertEqual('https://host:8086', cli._client._baseurl)
934-
935-
cli = InfluxDBClusterClient.from_DSN(
936-
'uDP+influxdb://usr:pwd@host1:8086,usr:pwd@host2:8086/db')
937-
self.assertTrue(cli._client.use_udp)

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.