From 04c145253c7c489c53d28edf2f26555b43e63056 Mon Sep 17 00:00:00 2001 From: sovetnikov Date: Sat, 20 Oct 2018 18:32:38 +0300 Subject: [PATCH 01/33] Distributed django.core.cache metrics storage and collection (initial) --- prometheus_client/core.py | 11 +- prometheus_client/distributed.py | 247 ++++++++++++++++++++++ tests/test_distibuted.py | 348 +++++++++++++++++++++++++++++++ 3 files changed, 602 insertions(+), 4 deletions(-) create mode 100644 prometheus_client/distributed.py create mode 100644 tests/test_distibuted.py diff --git a/prometheus_client/core.py b/prometheus_client/core.py index 964947a8..ab3b1bcb 100644 --- a/prometheus_client/core.py +++ b/prometheus_client/core.py @@ -2,7 +2,6 @@ from __future__ import unicode_literals -from collections import namedtuple import copy import json import math @@ -11,10 +10,11 @@ import re import struct import sys -from threading import Lock import time -from timeit import default_timer import types +from collections import namedtuple +from threading import Lock +from timeit import default_timer from .decorator import decorate @@ -653,7 +653,6 @@ def _mmap_key(metric_name, name, labelnames, labelvalues): labels = dict(zip(labelnames, labelvalues)) return json.dumps([metric_name, name, labels], sort_keys=True) - def _MultiProcessValue(_pidFunc=os.getpid): files = {} values = [] @@ -727,6 +726,10 @@ def get(self): # no control over we use an environment variable. if 'prometheus_multiproc_dir' in os.environ: _ValueClass = _MultiProcessValue() +elif 'prometheus_distributed' in os.environ: + from prometheus_client.distributed import DistributedValue + + _ValueClass = DistributedValue else: _ValueClass = _MutexValue diff --git a/prometheus_client/distributed.py b/prometheus_client/distributed.py new file mode 100644 index 00000000..a9de257d --- /dev/null +++ b/prometheus_client/distributed.py @@ -0,0 +1,247 @@ +#!/usr/bin/python + +from __future__ import unicode_literals + +import inspect +import json +import os +import socket +import time +from collections import defaultdict + +from django.core.cache import cache + +from . import core + +hostname = socket.gethostname() + + +class CacheLock(object): + def __init__(self, lock_id, ttl): + self.id = 'cachelock-{0}'.format(lock_id) + self.ttl = ttl + self.status = False + self.timeout_at = None + + def __nonzero__(self): + return self.status + + def __enter__(self): + trys = 6 + while trys: + self.timeout_at = time.monotonic() + self.ttl - 2 + self.status = cache.add(self.id, 'locked', self.ttl) + if self.status: + return self.status + time.sleep(0.1) + trys -= 1 + raise Exception('Could not lock for {self.id}'.format(**locals())) + + def __exit__(self, type, value, tb): + if self.status: + if time.monotonic() < self.timeout_at: + cache.delete(self.id) + + +distributed_list_cache_key = 'pc_distributed_list' +distributed_list_lock = CacheLock(distributed_list_cache_key, ttl=3) +added_to_distributed_list = set() + + +def add_to_distributed_list(pid): + if not (hostname, pid) in added_to_distributed_list: + with distributed_list_lock: + l = cache.get(distributed_list_cache_key, set()) + l.add((hostname, pid)) + cache.set(distributed_list_cache_key, l, 60 * 20) + + +_pidFunc = os.getpid + +__cached_get_all_typ = None + + +def get_all_typ(): + global __cached_get_all_typ + if __cached_get_all_typ is None: + __cached_get_all_typ = [] + for name, obj in inspect.getmembers(core): + if inspect.isclass(obj): + typ = getattr(obj, '_type', None) + if typ: + __cached_get_all_typ.append(typ) + else: + wrapper = getattr(obj, '__wrapped__', None) + if wrapper: + typ = getattr(wrapper, '_type', None) + if typ: + __cached_get_all_typ.append(typ) + __cached_get_all_typ.extend([core.Gauge.__wrapped__._type + '_' + mode for mode in core.Gauge.__wrapped__._MULTIPROC_MODES]) + return __cached_get_all_typ + + +prometheus_cache_key_prefix = 'pcachekey.' + + +def get_cache_key(typ, hostname, pid, prometheus_cache_key_prefix=prometheus_cache_key_prefix): + return '{prometheus_cache_key_prefix}{typ};{hostname};{pid}'.format(**locals()) + + +def deconstruct_cache_key(cache_key): + l = cache_key.replace(prometheus_cache_key_prefix, '').split(';') + return l + + +class DistributedValue(object): + _multiprocess = False + + def __init__(self, typ, metric_name, name, labelnames, labelvalues, multiprocess_mode='', **kwargs): + if typ == 'gauge': + typ_prefix = typ + '_' + multiprocess_mode + else: + typ_prefix = typ + self.typ_prefix = typ_prefix + self.valuekey = self._key = core._mmap_key(metric_name, name, labelnames, labelvalues) + self.__reset() + + @property + def lock(self): + return CacheLock(self.cachekey, ttl=3) + + @property + def cachekey(self): + pid = _pidFunc() + add_to_distributed_list(pid) + return get_cache_key(self.typ_prefix, hostname, pid) + + def __get_dict(self): + return cache.get(self.cachekey, {}) + + def __set_dict(self, dict_value): + cache.set(self.cachekey, dict_value, 60 * 10) + + def __reset(self): + with self.lock: + d = self.__get_dict() + if not self.valuekey in d: + d[self.valuekey] = 0 + self.__set_dict(d) + + def inc(self, amount): + with self.lock: + d = self.__get_dict() + d[self.valuekey] = d.get(self.valuekey,0) + amount + self.__set_dict(d) + + def set(self, value): + with self.lock: + d = self.__get_dict() + d[self.valuekey] = value + self.__set_dict(d) + + def get(self): + with self.lock: + return self.__get_dict().get(self.valuekey, None) + + +class DistributedCollector(object): + def __init__(self, registry): + if registry: + registry.register(self) + + def collect(self): + cache_keys = set() + distributed_list = cache.get(distributed_list_cache_key, set()) + for hostname, pid in distributed_list: + for typ in get_all_typ(): + cache_keys.add(get_cache_key(typ, hostname, pid)) + + return self.merge(cache.get_many(cache_keys), accumulate=True) + + def merge(self, cache_values, accumulate=True): + """Merge metrics from given mmap files. + + By default, histograms are accumulated, as per prometheus wire format. + But if writing the merged data back to mmap files, use + accumulate=False to avoid compound accumulation. + """ + metrics = {} + for cache_key, cache_value in cache_values.items(): + typ, value_hostname, pid = deconstruct_cache_key(cache_key) + multiprocess_mode = None + if '_' in typ: + typ, multiprocess_mode = typ.split('_') + for key, value in cache_value.items(): + metric_name, name, labels = json.loads(key) + labels_key = tuple(sorted(labels.items())) + + metric = metrics.get(metric_name) + if metric is None: + metric = core.Metric(metric_name, 'Multiprocess metric', typ) + metrics[metric_name] = metric + + if typ == 'gauge': + metric._multiprocess_mode = multiprocess_mode + metric.add_sample(name, labels_key + (('pid', pid), + ('hostname', value_hostname) + ), value) + else: + # The duplicates and labels are fixed in the next for. + metric.add_sample(name, labels_key, value) + + for metric in metrics.values(): + samples = defaultdict(float) + buckets = {} + for s in metric.samples: + name, labels, value = s.name, s.labels, s.value + if metric.type == 'gauge': + without_pid = tuple(l for l in labels if l[0] != 'pid') + if metric._multiprocess_mode == 'min': + current = samples.setdefault((name, without_pid), value) + if value < current: + samples[(s.name, without_pid)] = value + elif metric._multiprocess_mode == 'max': + current = samples.setdefault((name, without_pid), value) + if value > current: + samples[(s.name, without_pid)] = value + elif metric._multiprocess_mode == 'livesum': + samples[(name, without_pid)] += value + else: # all/liveall + samples[(name, labels)] = value + + elif metric.type == 'histogram': + bucket = tuple(float(l[1]) for l in labels if l[0] == 'le') + if bucket: + # _bucket + without_le = tuple(l for l in labels if l[0] != 'le') + buckets.setdefault(without_le, {}) + buckets[without_le].setdefault(bucket[0], 0.0) + buckets[without_le][bucket[0]] += value + else: + # _sum/_count + samples[(s.name, labels)] += value + + else: + # Counter and Summary. + samples[(s.name, labels)] += value + + # Accumulate bucket values. + if metric.type == 'histogram': + for labels, values in buckets.items(): + acc = 0.0 + for bucket, value in sorted(values.items()): + sample_key = ( + metric.name + '_bucket', + labels + (('le', core._floatToGoString(bucket)),), + ) + if accumulate: + acc += value + samples[sample_key] = acc + else: + samples[sample_key] = value + if accumulate: + samples[(metric.name + '_count', labels)] = acc + + # Convert to correct sample format. + metric.samples = [core.Sample(name, dict(labels), value) for (name, labels), value in samples.items()] + return metrics.values() diff --git a/tests/test_distibuted.py b/tests/test_distibuted.py new file mode 100644 index 00000000..cab76665 --- /dev/null +++ b/tests/test_distibuted.py @@ -0,0 +1,348 @@ +from __future__ import unicode_literals + +import glob +import os +import shutil +import sys +import tempfile + +import django + +from prometheus_client import core +from prometheus_client.core import ( + CollectorRegistry, Counter, Gauge, Histogram, Sample, Summary, +) +from prometheus_client.multiprocess import ( + mark_process_dead, MultiProcessCollector, +) + +if sys.version_info < (2, 7): + # We need the skip decorators from unittest2 on Python 2.6. + import unittest2 as unittest +else: + import unittest + +from unittest.mock import Mock, patch + + +class TestDistributed(unittest.TestCase): + def patch(self, *args, **kwargs) -> Mock: + p = patch(*args, **kwargs) + mock = p.start() + self.addCleanup(p.stop) + return mock + + def setUp(self): + from django.conf import settings + if not settings.configured: + settings.configure( + CACHES={ + 'default': { + 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache', + 'LOCATION': 'extra-fast-sqlite-test', + } + } + ) + + self.pid = None + from prometheus_client.distributed import _pidFunc + self._original_pidFunc = _pidFunc + self.patch('prometheus_client.distributed._pidFunc', new=self.pidFunc) + + from prometheus_client.distributed import DistributedValue + core._ValueClass = DistributedValue + self.registry = CollectorRegistry() + from prometheus_client.distributed import DistributedCollector + self.collector = DistributedCollector(self.registry) + + def pidFunc(self): + if self.pid is not None: + return self.pid + return self._original_pidFunc() + + def tearDown(self): + core._ValueClass = core._MutexValue + + def test_counter_adds(self): + c1 = Counter('c', 'help', registry=None) + c2 = Counter('c', 'help', registry=None) + self.assertEqual(0, self.registry.get_sample_value('c_total')) + c1.inc(1) + c2.inc(2) + self.assertEqual(3, self.registry.get_sample_value('c_total')) + + def test_summary_adds(self): + s1 = Summary('s', 'help', registry=None) + s2 = Summary('s', 'help', registry=None) + self.assertEqual(0, self.registry.get_sample_value('s_count')) + self.assertEqual(0, self.registry.get_sample_value('s_sum')) + s1.observe(1) + s2.observe(2) + self.assertEqual(2, self.registry.get_sample_value('s_count')) + self.assertEqual(3, self.registry.get_sample_value('s_sum')) + + def test_histogram_adds(self): + h1 = Histogram('h', 'help', registry=None) + h2 = Histogram('h', 'help', registry=None) + self.assertEqual(0, self.registry.get_sample_value('h_count')) + self.assertEqual(0, self.registry.get_sample_value('h_sum')) + self.assertEqual(0, self.registry.get_sample_value('h_bucket', {'le': '5.0'})) + h1.observe(1) + h2.observe(2) + self.assertEqual(2, self.registry.get_sample_value('h_count')) + self.assertEqual(3, self.registry.get_sample_value('h_sum')) + self.assertEqual(2, self.registry.get_sample_value('h_bucket', {'le': '5.0'})) + + def test_gauge_all(self): + g1 = Gauge('g', 'help', registry=None) + g2 = Gauge('g', 'help', registry=None) + self.assertEqual(0, self.registry.get_sample_value('g', {'pid': '123'})) + self.assertEqual(0, self.registry.get_sample_value('g', {'pid': '456'})) + g1.set(1) + g2.set(2) + mark_process_dead(123, os.environ['prometheus_multiproc_dir']) + self.assertEqual(1, self.registry.get_sample_value('g', {'pid': '123'})) + self.assertEqual(2, self.registry.get_sample_value('g', {'pid': '456'})) + + def test_gauge_liveall(self): + g1 = Gauge('g', 'help', registry=None, multiprocess_mode='liveall') + + g2 = Gauge('g', 'help', registry=None, multiprocess_mode='liveall') + self.assertEqual(0, self.registry.get_sample_value('g', {'pid': '123'})) + self.assertEqual(0, self.registry.get_sample_value('g', {'pid': '456'})) + g1.set(1) + g2.set(2) + self.assertEqual(1, self.registry.get_sample_value('g', {'pid': '123'})) + self.assertEqual(2, self.registry.get_sample_value('g', {'pid': '456'})) + mark_process_dead(123, os.environ['prometheus_multiproc_dir']) + self.assertEqual(None, self.registry.get_sample_value('g', {'pid': '123'})) + self.assertEqual(2, self.registry.get_sample_value('g', {'pid': '456'})) + + def test_gauge_min(self): + g1 = Gauge('g', 'help', registry=None, multiprocess_mode='min') + + g2 = Gauge('g', 'help', registry=None, multiprocess_mode='min') + self.assertEqual(0, self.registry.get_sample_value('g')) + g1.set(1) + g2.set(2) + self.assertEqual(1, self.registry.get_sample_value('g')) + + def test_gauge_max(self): + g1 = Gauge('g', 'help', registry=None, multiprocess_mode='max') + + g2 = Gauge('g', 'help', registry=None, multiprocess_mode='max') + self.assertEqual(0, self.registry.get_sample_value('g')) + g1.set(1) + g2.set(2) + self.assertEqual(2, self.registry.get_sample_value('g')) + + def test_gauge_livesum(self): + g1 = Gauge('g', 'help', registry=None, multiprocess_mode='livesum') + + g2 = Gauge('g', 'help', registry=None, multiprocess_mode='livesum') + self.assertEqual(0, self.registry.get_sample_value('g')) + g1.set(1) + g2.set(2) + self.assertEqual(3, self.registry.get_sample_value('g')) + mark_process_dead(123, os.environ['prometheus_multiproc_dir']) + self.assertEqual(2, self.registry.get_sample_value('g')) + + def test_namespace_subsystem(self): + c1 = Counter('c', 'help', registry=None, namespace='ns', subsystem='ss') + c1.inc(1) + self.assertEqual(1, self.registry.get_sample_value('ns_ss_c_total')) + + def test_counter_across_forks(self): + self.pid = 0 + c1 = Counter('c', 'help', registry=None) + self.assertEqual(0, self.registry.get_sample_value('c_total')) + c1.inc(1) + c1.inc(1) + self.pid = 1 + c1.inc(1) + self.assertEqual(3, self.registry.get_sample_value('c_total')) + self.assertEqual(1, c1._value.get()) + + def test_initialization_detects_pid_change(self): + self.pid = 0 + + # can not inspect the files cache directly, as it's a closure, so we + # check for the actual files themselves + def files(): + fs = os.listdir(os.environ['prometheus_multiproc_dir']) + fs.sort() + return fs + + c1 = Counter('c1', 'c1', registry=None) + self.assertEqual(files(), ['counter_0.db']) + c2 = Counter('c2', 'c2', registry=None) + self.assertEqual(files(), ['counter_0.db']) + self.pid = 1 + c3 = Counter('c3', 'c3', registry=None) + self.assertEqual(files(), ['counter_0.db', 'counter_1.db']) + + + @unittest.skipIf(sys.version_info < (2, 7), "Test requires Python 2.7+.") + def test_collect(self): + self.pid = 0 + + labels = dict((i, i) for i in 'abcd') + + def add_label(key, value): + l = labels.copy() + l[key] = value + return l + + c = Counter('c', 'help', labelnames=labels.keys(), registry=None) + g = Gauge('g', 'help', labelnames=labels.keys(), registry=None) + h = Histogram('h', 'help', labelnames=labels.keys(), registry=None) + + c.labels(**labels).inc(1) + g.labels(**labels).set(1) + h.labels(**labels).observe(1) + + self.pid = 1 + + c.labels(**labels).inc(1) + g.labels(**labels).set(1) + h.labels(**labels).observe(5) + + metrics = dict((m.name, m) for m in self.collector.collect()) + + self.assertEqual( + metrics['c'].samples, [Sample('c_total', labels, 2.0)] + ) + metrics['g'].samples.sort(key=lambda x: x[1]['pid']) + for sample in metrics['g'].samples: + sample.labels.pop('hostname') + self.assertEqual(metrics['g'].samples, [ + Sample('g', add_label('pid', '0'), 1.0), + Sample('g', add_label('pid', '1'), 1.0), + ]) + + metrics['h'].samples.sort( + key=lambda x: (x[0], float(x[1].get('le', 0))) + ) + expected_histogram = [ + Sample('h_bucket', add_label('le', '0.005'), 0.0), + Sample('h_bucket', add_label('le', '0.01'), 0.0), + Sample('h_bucket', add_label('le', '0.025'), 0.0), + Sample('h_bucket', add_label('le', '0.05'), 0.0), + Sample('h_bucket', add_label('le', '0.075'), 0.0), + Sample('h_bucket', add_label('le', '0.1'), 0.0), + Sample('h_bucket', add_label('le', '0.25'), 0.0), + Sample('h_bucket', add_label('le', '0.5'), 0.0), + Sample('h_bucket', add_label('le', '0.75'), 0.0), + Sample('h_bucket', add_label('le', '1.0'), 1.0), + Sample('h_bucket', add_label('le', '2.5'), 1.0), + Sample('h_bucket', add_label('le', '5.0'), 2.0), + Sample('h_bucket', add_label('le', '7.5'), 2.0), + Sample('h_bucket', add_label('le', '10.0'), 2.0), + Sample('h_bucket', add_label('le', '+Inf'), 2.0), + Sample('h_count', labels, 2.0), + Sample('h_sum', labels, 6.0), + ] + + self.assertEqual(metrics['h'].samples, expected_histogram) + + @unittest.skipIf(sys.version_info < (2, 7), "Test requires Python 2.7+.") + def test_merge_no_accumulate(self): + self.pid = 0 + labels = dict((i, i) for i in 'abcd') + + def add_label(key, value): + l = labels.copy() + l[key] = value + return l + + h = Histogram('h', 'help', labelnames=labels.keys(), registry=None) + h.labels(**labels).observe(1) + self.pid = 1 + h.labels(**labels).observe(5) + + path = os.path.join(os.environ['prometheus_multiproc_dir'], '*.db') + files = glob.glob(path) + metrics = dict( + (m.name, m) for m in self.collector.merge(files, accumulate=False) + ) + + metrics['h'].samples.sort( + key=lambda x: (x[0], float(x[1].get('le', 0))) + ) + expected_histogram = [ + Sample('h_bucket', add_label('le', '0.005'), 0.0), + Sample('h_bucket', add_label('le', '0.01'), 0.0), + Sample('h_bucket', add_label('le', '0.025'), 0.0), + Sample('h_bucket', add_label('le', '0.05'), 0.0), + Sample('h_bucket', add_label('le', '0.075'), 0.0), + Sample('h_bucket', add_label('le', '0.1'), 0.0), + Sample('h_bucket', add_label('le', '0.25'), 0.0), + Sample('h_bucket', add_label('le', '0.5'), 0.0), + Sample('h_bucket', add_label('le', '0.75'), 0.0), + Sample('h_bucket', add_label('le', '1.0'), 1.0), + Sample('h_bucket', add_label('le', '2.5'), 0.0), + Sample('h_bucket', add_label('le', '5.0'), 1.0), + Sample('h_bucket', add_label('le', '7.5'), 0.0), + Sample('h_bucket', add_label('le', '10.0'), 0.0), + Sample('h_bucket', add_label('le', '+Inf'), 0.0), + Sample('h_sum', labels, 6.0), + ] + + self.assertEqual(metrics['h'].samples, expected_histogram) + + +class TestMmapedDict(unittest.TestCase): + def setUp(self): + fd, self.tempfile = tempfile.mkstemp() + os.close(fd) + self.d = core._MmapedDict(self.tempfile) + + def test_process_restart(self): + self.d.write_value('abc', 123.0) + self.d.close() + self.d = core._MmapedDict(self.tempfile) + self.assertEqual(123, self.d.read_value('abc')) + self.assertEqual([('abc', 123.0)], list(self.d.read_all_values())) + + def test_expansion(self): + key = 'a' * core._INITIAL_MMAP_SIZE + self.d.write_value(key, 123.0) + self.assertEqual([(key, 123.0)], list(self.d.read_all_values())) + + def test_multi_expansion(self): + key = 'a' * core._INITIAL_MMAP_SIZE * 4 + self.d.write_value('abc', 42.0) + self.d.write_value(key, 123.0) + self.d.write_value('def', 17.0) + self.assertEqual( + [('abc', 42.0), (key, 123.0), ('def', 17.0)], + list(self.d.read_all_values())) + + def test_corruption_detected(self): + self.d.write_value('abc', 42.0) + # corrupt the written data + self.d._m[8:16] = b'somejunk' + with self.assertRaises(RuntimeError): + list(self.d.read_all_values()) + + def tearDown(self): + os.unlink(self.tempfile) + + +class TestUnsetEnv(unittest.TestCase): + def setUp(self): + self.registry = CollectorRegistry() + fp, self.tmpfl = tempfile.mkstemp() + os.close(fp) + + def test_unset_syncdir_env(self): + self.assertRaises( + ValueError, MultiProcessCollector, self.registry) + + def test_file_syncpath(self): + registry = CollectorRegistry() + self.assertRaises( + ValueError, MultiProcessCollector, registry, self.tmpfl) + + def tearDown(self): + os.remove(self.tmpfl) From 4d44d7f029f8607260a639df4cd1af7f2555c772 Mon Sep 17 00:00:00 2001 From: sovetnikov Date: Sat, 20 Oct 2018 18:53:33 +0300 Subject: [PATCH 02/33] import cycle break --- prometheus_client/distributed.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/prometheus_client/distributed.py b/prometheus_client/distributed.py index a9de257d..a863f776 100644 --- a/prometheus_client/distributed.py +++ b/prometheus_client/distributed.py @@ -11,8 +11,6 @@ from django.core.cache import cache -from . import core - hostname = socket.gethostname() @@ -62,6 +60,8 @@ def add_to_distributed_list(pid): def get_all_typ(): + from . import core + global __cached_get_all_typ if __cached_get_all_typ is None: __cached_get_all_typ = [] @@ -101,6 +101,8 @@ def __init__(self, typ, metric_name, name, labelnames, labelvalues, multiprocess else: typ_prefix = typ self.typ_prefix = typ_prefix + from . import core + self.valuekey = self._key = core._mmap_key(metric_name, name, labelnames, labelvalues) self.__reset() @@ -177,6 +179,8 @@ def merge(self, cache_values, accumulate=True): metric = metrics.get(metric_name) if metric is None: + from . import core + metric = core.Metric(metric_name, 'Multiprocess metric', typ) metrics[metric_name] = metric From cf421898f7f90b18bc2b7ebd83d46c80b05eb7b6 Mon Sep 17 00:00:00 2001 From: sovetnikov Date: Sat, 20 Oct 2018 19:02:00 +0300 Subject: [PATCH 03/33] prevent double hosts list adding --- prometheus_client/distributed.py | 1 + 1 file changed, 1 insertion(+) diff --git a/prometheus_client/distributed.py b/prometheus_client/distributed.py index a863f776..d5fa3c68 100644 --- a/prometheus_client/distributed.py +++ b/prometheus_client/distributed.py @@ -52,6 +52,7 @@ def add_to_distributed_list(pid): l = cache.get(distributed_list_cache_key, set()) l.add((hostname, pid)) cache.set(distributed_list_cache_key, l, 60 * 20) + added_to_distributed_list.add((hostname, pid)) _pidFunc = os.getpid From 490efad01963031cf9707d2c99474c15f155510b Mon Sep 17 00:00:00 2001 From: sovetnikov Date: Sat, 20 Oct 2018 19:21:33 +0300 Subject: [PATCH 04/33] debugging --- prometheus_client/distributed.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/prometheus_client/distributed.py b/prometheus_client/distributed.py index d5fa3c68..f3d8967f 100644 --- a/prometheus_client/distributed.py +++ b/prometheus_client/distributed.py @@ -13,7 +13,7 @@ hostname = socket.gethostname() - +in_lock = None class CacheLock(object): def __init__(self, lock_id, ttl): self.id = 'cachelock-{0}'.format(lock_id) @@ -25,17 +25,23 @@ def __nonzero__(self): return self.status def __enter__(self): + global in_lock + if in_lock: + raise Exception('Already on lock ' + in_lock) trys = 6 while trys: self.timeout_at = time.monotonic() + self.ttl - 2 self.status = cache.add(self.id, 'locked', self.ttl) if self.status: + in_lock = self.id return self.status time.sleep(0.1) trys -= 1 raise Exception('Could not lock for {self.id}'.format(**locals())) def __exit__(self, type, value, tb): + global in_lock + in_lock = None if self.status: if time.monotonic() < self.timeout_at: cache.delete(self.id) @@ -104,7 +110,7 @@ def __init__(self, typ, metric_name, name, labelnames, labelvalues, multiprocess self.typ_prefix = typ_prefix from . import core - self.valuekey = self._key = core._mmap_key(metric_name, name, labelnames, labelvalues) + self.valuekey = core._mmap_key(metric_name, name, labelnames, labelvalues) self.__reset() @property From 927d817ae3ca000ca2bb5569f37be79f70da5df0 Mon Sep 17 00:00:00 2001 From: sovetnikov Date: Sat, 20 Oct 2018 19:28:32 +0300 Subject: [PATCH 05/33] small lock ttl fix --- prometheus_client/distributed.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/prometheus_client/distributed.py b/prometheus_client/distributed.py index f3d8967f..04589bc6 100644 --- a/prometheus_client/distributed.py +++ b/prometheus_client/distributed.py @@ -30,7 +30,7 @@ def __enter__(self): raise Exception('Already on lock ' + in_lock) trys = 6 while trys: - self.timeout_at = time.monotonic() + self.ttl - 2 + self.timeout_at = time.monotonic() + self.ttl self.status = cache.add(self.id, 'locked', self.ttl) if self.status: in_lock = self.id From 4716a654d985841b52dd75c7e42161a2d49d9335 Mon Sep 17 00:00:00 2001 From: sovetnikov Date: Sat, 20 Oct 2018 19:31:34 +0300 Subject: [PATCH 06/33] debug --- prometheus_client/distributed.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/prometheus_client/distributed.py b/prometheus_client/distributed.py index 04589bc6..8602525b 100644 --- a/prometheus_client/distributed.py +++ b/prometheus_client/distributed.py @@ -45,6 +45,8 @@ def __exit__(self, type, value, tb): if self.status: if time.monotonic() < self.timeout_at: cache.delete(self.id) + if self.id in cache: + raise Exception('Id in cache ' + self.id) distributed_list_cache_key = 'pc_distributed_list' From eed4d364db23e802868d682ceddccbde325d6855 Mon Sep 17 00:00:00 2001 From: sovetnikov Date: Sat, 20 Oct 2018 20:56:57 +0300 Subject: [PATCH 07/33] full tests passing, less locks --- prometheus_client/distributed.py | 53 +++++--- tests/test_distibuted.py | 201 ++++++++++++------------------- 2 files changed, 117 insertions(+), 137 deletions(-) diff --git a/prometheus_client/distributed.py b/prometheus_client/distributed.py index 8602525b..061f9bad 100644 --- a/prometheus_client/distributed.py +++ b/prometheus_client/distributed.py @@ -8,12 +8,16 @@ import socket import time from collections import defaultdict +from threading import Lock from django.core.cache import cache hostname = socket.gethostname() in_lock = None + +lock = Lock() + class CacheLock(object): def __init__(self, lock_id, ttl): self.id = 'cachelock-{0}'.format(lock_id) @@ -63,6 +67,25 @@ def add_to_distributed_list(pid): added_to_distributed_list.add((hostname, pid)) +def remove_from_distributed_list(pid): + with distributed_list_lock: + l = cache.get(distributed_list_cache_key, set()) + + def _iterate(): + if isinstance(pid, int): + yield pid + else: + for p in pid: + yield p + + if (hostname, pid) in l: + l.remove((hostname, pid)) + if (hostname, pid) in added_to_distributed_list: + added_to_distributed_list.remove((hostname, pid)) + + cache.set(distributed_list_cache_key, l, 60 * 20) + + _pidFunc = os.getpid __cached_get_all_typ = None @@ -83,8 +106,8 @@ def get_all_typ(): wrapper = getattr(obj, '__wrapped__', None) if wrapper: typ = getattr(wrapper, '_type', None) - if typ: - __cached_get_all_typ.append(typ) + if typ: + __cached_get_all_typ.append(typ) __cached_get_all_typ.extend([core.Gauge.__wrapped__._type + '_' + mode for mode in core.Gauge.__wrapped__._MULTIPROC_MODES]) return __cached_get_all_typ @@ -106,6 +129,8 @@ class DistributedValue(object): def __init__(self, typ, metric_name, name, labelnames, labelvalues, multiprocess_mode='', **kwargs): if typ == 'gauge': + if multiprocess_mode == 'all': + raise Exception('multiprocess_mode=all not supported in distributed storage') typ_prefix = typ + '_' + multiprocess_mode else: typ_prefix = typ @@ -115,10 +140,6 @@ def __init__(self, typ, metric_name, name, labelnames, labelvalues, multiprocess self.valuekey = core._mmap_key(metric_name, name, labelnames, labelvalues) self.__reset() - @property - def lock(self): - return CacheLock(self.cachekey, ttl=3) - @property def cachekey(self): pid = _pidFunc() @@ -132,26 +153,26 @@ def __set_dict(self, dict_value): cache.set(self.cachekey, dict_value, 60 * 10) def __reset(self): - with self.lock: + with lock: d = self.__get_dict() if not self.valuekey in d: d[self.valuekey] = 0 self.__set_dict(d) def inc(self, amount): - with self.lock: + with lock: d = self.__get_dict() - d[self.valuekey] = d.get(self.valuekey,0) + amount + d[self.valuekey] = d.get(self.valuekey, 0) + amount self.__set_dict(d) def set(self, value): - with self.lock: + with lock: d = self.__get_dict() d[self.valuekey] = value self.__set_dict(d) def get(self): - with self.lock: + with lock: return self.__get_dict().get(self.valuekey, None) @@ -159,6 +180,7 @@ class DistributedCollector(object): def __init__(self, registry): if registry: registry.register(self) + self.accumulate = True def collect(self): cache_keys = set() @@ -167,7 +189,7 @@ def collect(self): for typ in get_all_typ(): cache_keys.add(get_cache_key(typ, hostname, pid)) - return self.merge(cache.get_many(cache_keys), accumulate=True) + return self.merge(cache.get_many(cache_keys), accumulate=self.accumulate) def merge(self, cache_values, accumulate=True): """Merge metrics from given mmap files. @@ -176,6 +198,7 @@ def merge(self, cache_values, accumulate=True): But if writing the merged data back to mmap files, use accumulate=False to avoid compound accumulation. """ + from . import core metrics = {} for cache_key, cache_value in cache_values.items(): typ, value_hostname, pid = deconstruct_cache_key(cache_key) @@ -188,8 +211,6 @@ def merge(self, cache_values, accumulate=True): metric = metrics.get(metric_name) if metric is None: - from . import core - metric = core.Metric(metric_name, 'Multiprocess metric', typ) metrics[metric_name] = metric @@ -258,3 +279,7 @@ def merge(self, cache_values, accumulate=True): # Convert to correct sample format. metric.samples = [core.Sample(name, dict(labels), value) for (name, labels), value in samples.items()] return metrics.values() + + +def mark_distributed_process_dead(pid): + remove_from_distributed_list(pid) diff --git a/tests/test_distibuted.py b/tests/test_distibuted.py index cab76665..2cb63884 100644 --- a/tests/test_distibuted.py +++ b/tests/test_distibuted.py @@ -3,8 +3,10 @@ import glob import os import shutil +import socket import sys import tempfile +from random import randint import django @@ -13,7 +15,7 @@ CollectorRegistry, Counter, Gauge, Histogram, Sample, Summary, ) from prometheus_client.multiprocess import ( - mark_process_dead, MultiProcessCollector, + MultiProcessCollector, ) if sys.version_info < (2, 7): @@ -24,6 +26,8 @@ from unittest.mock import Mock, patch +hostname = socket.gethostname() + class TestDistributed(unittest.TestCase): def patch(self, *args, **kwargs) -> Mock: @@ -64,12 +68,12 @@ def tearDown(self): core._ValueClass = core._MutexValue def test_counter_adds(self): - c1 = Counter('c', 'help', registry=None) - c2 = Counter('c', 'help', registry=None) - self.assertEqual(0, self.registry.get_sample_value('c_total')) + c1 = Counter('c2', 'help', registry=None) + c2 = Counter('c2', 'help', registry=None) + self.assertEqual(0, self.registry.get_sample_value('c2_total')) c1.inc(1) c2.inc(2) - self.assertEqual(3, self.registry.get_sample_value('c_total')) + self.assertEqual(3, self.registry.get_sample_value('c2_total')) def test_summary_adds(self): s1 = Summary('s', 'help', registry=None) @@ -94,58 +98,67 @@ def test_histogram_adds(self): self.assertEqual(2, self.registry.get_sample_value('h_bucket', {'le': '5.0'})) def test_gauge_all(self): - g1 = Gauge('g', 'help', registry=None) - g2 = Gauge('g', 'help', registry=None) - self.assertEqual(0, self.registry.get_sample_value('g', {'pid': '123'})) - self.assertEqual(0, self.registry.get_sample_value('g', {'pid': '456'})) - g1.set(1) - g2.set(2) - mark_process_dead(123, os.environ['prometheus_multiproc_dir']) - self.assertEqual(1, self.registry.get_sample_value('g', {'pid': '123'})) - self.assertEqual(2, self.registry.get_sample_value('g', {'pid': '456'})) + self.pid = 123 + with self.assertRaises(Exception) as e: + g1 = Gauge('g1', 'help', registry=None) + self.assertTrue('not supported' in str(e.exception)) def test_gauge_liveall(self): - g1 = Gauge('g', 'help', registry=None, multiprocess_mode='liveall') - - g2 = Gauge('g', 'help', registry=None, multiprocess_mode='liveall') - self.assertEqual(0, self.registry.get_sample_value('g', {'pid': '123'})) - self.assertEqual(0, self.registry.get_sample_value('g', {'pid': '456'})) + self.pid = 123 + g1 = Gauge('g2', 'help', registry=None, multiprocess_mode='liveall') + self.pid = 456 + g2 = Gauge('g2', 'help', registry=None, multiprocess_mode='liveall') + self.assertEqual(0, self.registry.get_sample_value('g2', {'hostname':hostname,'pid': '123'})) + self.assertEqual(0, self.registry.get_sample_value('g2', {'hostname':hostname,'pid': '456'})) + self.pid = 123 g1.set(1) + self.pid = 456 g2.set(2) - self.assertEqual(1, self.registry.get_sample_value('g', {'pid': '123'})) - self.assertEqual(2, self.registry.get_sample_value('g', {'pid': '456'})) - mark_process_dead(123, os.environ['prometheus_multiproc_dir']) - self.assertEqual(None, self.registry.get_sample_value('g', {'pid': '123'})) - self.assertEqual(2, self.registry.get_sample_value('g', {'pid': '456'})) + self.assertEqual(1, self.registry.get_sample_value('g2', {'hostname':hostname,'pid': '123'})) + self.assertEqual(2, self.registry.get_sample_value('g2', {'hostname':hostname,'pid': '456'})) + from prometheus_client.distributed import mark_distributed_process_dead + mark_distributed_process_dead(123) + self.assertEqual(None, self.registry.get_sample_value('g2', {'hostname':hostname,'pid': '123'})) + self.assertEqual(2, self.registry.get_sample_value('g2', {'hostname':hostname,'pid': '456'})) def test_gauge_min(self): - g1 = Gauge('g', 'help', registry=None, multiprocess_mode='min') - - g2 = Gauge('g', 'help', registry=None, multiprocess_mode='min') - self.assertEqual(0, self.registry.get_sample_value('g')) + self.pid = 123 + g1 = Gauge('gm', 'help', registry=None, multiprocess_mode='min') + self.pid = 456 + g2 = Gauge('gm', 'help', registry=None, multiprocess_mode='min') + self.assertEqual(0, self.registry.get_sample_value('gm', {'hostname':hostname})) + self.pid = 123 g1.set(1) + self.pid = 456 g2.set(2) - self.assertEqual(1, self.registry.get_sample_value('g')) + self.assertEqual(1, self.registry.get_sample_value('gm', {'hostname':hostname})) def test_gauge_max(self): - g1 = Gauge('g', 'help', registry=None, multiprocess_mode='max') - - g2 = Gauge('g', 'help', registry=None, multiprocess_mode='max') - self.assertEqual(0, self.registry.get_sample_value('g')) + self.pid = 123 + g1 = Gauge('gmax', 'help', registry=None, multiprocess_mode='max') + self.pid = 456 + g2 = Gauge('gmax', 'help', registry=None, multiprocess_mode='max') + self.assertEqual(0, self.registry.get_sample_value('gmax', {'hostname':hostname})) + self.pid = 123 g1.set(1) + self.pid = 456 g2.set(2) - self.assertEqual(2, self.registry.get_sample_value('g')) + self.assertEqual(2, self.registry.get_sample_value('gmax', {'hostname':hostname})) def test_gauge_livesum(self): - g1 = Gauge('g', 'help', registry=None, multiprocess_mode='livesum') - - g2 = Gauge('g', 'help', registry=None, multiprocess_mode='livesum') - self.assertEqual(0, self.registry.get_sample_value('g')) + self.pid = 123 + g1 = Gauge('gls', 'help', registry=None, multiprocess_mode='livesum') + self.pid = 456 + g2 = Gauge('gls', 'help', registry=None, multiprocess_mode='livesum') + self.assertEqual(0, self.registry.get_sample_value('gls', {'hostname':hostname})) + self.pid = 123 g1.set(1) + self.pid = 456 g2.set(2) - self.assertEqual(3, self.registry.get_sample_value('g')) - mark_process_dead(123, os.environ['prometheus_multiproc_dir']) - self.assertEqual(2, self.registry.get_sample_value('g')) + self.assertEqual(3, self.registry.get_sample_value('gls', {'hostname':hostname})) + from prometheus_client.distributed import mark_distributed_process_dead + mark_distributed_process_dead(123) + self.assertEqual(2, self.registry.get_sample_value('gls', {'hostname':hostname})) def test_namespace_subsystem(self): c1 = Counter('c', 'help', registry=None, namespace='ns', subsystem='ss') @@ -163,25 +176,6 @@ def test_counter_across_forks(self): self.assertEqual(3, self.registry.get_sample_value('c_total')) self.assertEqual(1, c1._value.get()) - def test_initialization_detects_pid_change(self): - self.pid = 0 - - # can not inspect the files cache directly, as it's a closure, so we - # check for the actual files themselves - def files(): - fs = os.listdir(os.environ['prometheus_multiproc_dir']) - fs.sort() - return fs - - c1 = Counter('c1', 'c1', registry=None) - self.assertEqual(files(), ['counter_0.db']) - c2 = Counter('c2', 'c2', registry=None) - self.assertEqual(files(), ['counter_0.db']) - self.pid = 1 - c3 = Counter('c3', 'c3', registry=None) - self.assertEqual(files(), ['counter_0.db', 'counter_1.db']) - - @unittest.skipIf(sys.version_info < (2, 7), "Test requires Python 2.7+.") def test_collect(self): self.pid = 0 @@ -194,7 +188,7 @@ def add_label(key, value): return l c = Counter('c', 'help', labelnames=labels.keys(), registry=None) - g = Gauge('g', 'help', labelnames=labels.keys(), registry=None) + g = Gauge('g', 'help', labelnames=labels.keys(), registry=None, multiprocess_mode='liveall') h = Histogram('h', 'help', labelnames=labels.keys(), registry=None) c.labels(**labels).inc(1) @@ -255,78 +249,39 @@ def add_label(key, value): l[key] = value return l - h = Histogram('h', 'help', labelnames=labels.keys(), registry=None) + h = Histogram('hna', 'help', labelnames=labels.keys(), registry=None) h.labels(**labels).observe(1) self.pid = 1 h.labels(**labels).observe(5) - path = os.path.join(os.environ['prometheus_multiproc_dir'], '*.db') - files = glob.glob(path) - metrics = dict( - (m.name, m) for m in self.collector.merge(files, accumulate=False) - ) + self.collector.accumulate = False + metrics = self.collector.collect() + self.collector.accumulate = True - metrics['h'].samples.sort( + metric = [x for x in metrics if x.name == 'hna'][0] + metric.samples.sort( key=lambda x: (x[0], float(x[1].get('le', 0))) ) expected_histogram = [ - Sample('h_bucket', add_label('le', '0.005'), 0.0), - Sample('h_bucket', add_label('le', '0.01'), 0.0), - Sample('h_bucket', add_label('le', '0.025'), 0.0), - Sample('h_bucket', add_label('le', '0.05'), 0.0), - Sample('h_bucket', add_label('le', '0.075'), 0.0), - Sample('h_bucket', add_label('le', '0.1'), 0.0), - Sample('h_bucket', add_label('le', '0.25'), 0.0), - Sample('h_bucket', add_label('le', '0.5'), 0.0), - Sample('h_bucket', add_label('le', '0.75'), 0.0), - Sample('h_bucket', add_label('le', '1.0'), 1.0), - Sample('h_bucket', add_label('le', '2.5'), 0.0), - Sample('h_bucket', add_label('le', '5.0'), 1.0), - Sample('h_bucket', add_label('le', '7.5'), 0.0), - Sample('h_bucket', add_label('le', '10.0'), 0.0), - Sample('h_bucket', add_label('le', '+Inf'), 0.0), - Sample('h_sum', labels, 6.0), + Sample('hna_bucket', add_label('le', '0.005'), 0.0), + Sample('hna_bucket', add_label('le', '0.01'), 0.0), + Sample('hna_bucket', add_label('le', '0.025'), 0.0), + Sample('hna_bucket', add_label('le', '0.05'), 0.0), + Sample('hna_bucket', add_label('le', '0.075'), 0.0), + Sample('hna_bucket', add_label('le', '0.1'), 0.0), + Sample('hna_bucket', add_label('le', '0.25'), 0.0), + Sample('hna_bucket', add_label('le', '0.5'), 0.0), + Sample('hna_bucket', add_label('le', '0.75'), 0.0), + Sample('hna_bucket', add_label('le', '1.0'), 1.0), + Sample('hna_bucket', add_label('le', '2.5'), 0.0), + Sample('hna_bucket', add_label('le', '5.0'), 1.0), + Sample('hna_bucket', add_label('le', '7.5'), 0.0), + Sample('hna_bucket', add_label('le', '10.0'), 0.0), + Sample('hna_bucket', add_label('le', '+Inf'), 0.0), + Sample('hna_sum', labels, 6.0), ] - self.assertEqual(metrics['h'].samples, expected_histogram) - - -class TestMmapedDict(unittest.TestCase): - def setUp(self): - fd, self.tempfile = tempfile.mkstemp() - os.close(fd) - self.d = core._MmapedDict(self.tempfile) - - def test_process_restart(self): - self.d.write_value('abc', 123.0) - self.d.close() - self.d = core._MmapedDict(self.tempfile) - self.assertEqual(123, self.d.read_value('abc')) - self.assertEqual([('abc', 123.0)], list(self.d.read_all_values())) - - def test_expansion(self): - key = 'a' * core._INITIAL_MMAP_SIZE - self.d.write_value(key, 123.0) - self.assertEqual([(key, 123.0)], list(self.d.read_all_values())) - - def test_multi_expansion(self): - key = 'a' * core._INITIAL_MMAP_SIZE * 4 - self.d.write_value('abc', 42.0) - self.d.write_value(key, 123.0) - self.d.write_value('def', 17.0) - self.assertEqual( - [('abc', 42.0), (key, 123.0), ('def', 17.0)], - list(self.d.read_all_values())) - - def test_corruption_detected(self): - self.d.write_value('abc', 42.0) - # corrupt the written data - self.d._m[8:16] = b'somejunk' - with self.assertRaises(RuntimeError): - list(self.d.read_all_values()) - - def tearDown(self): - os.unlink(self.tempfile) + self.assertEqual(metric.samples, expected_histogram) class TestUnsetEnv(unittest.TestCase): From 15ab51b944ac881999a59ea982b7769d3d1d3bef Mon Sep 17 00:00:00 2001 From: sovetnikov Date: Sat, 20 Oct 2018 22:58:36 +0300 Subject: [PATCH 08/33] cache choosing --- prometheus_client/core.py | 2 +- prometheus_client/distributed.py | 13 ++++++++----- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/prometheus_client/core.py b/prometheus_client/core.py index ab3b1bcb..5add4a2e 100644 --- a/prometheus_client/core.py +++ b/prometheus_client/core.py @@ -726,7 +726,7 @@ def get(self): # no control over we use an environment variable. if 'prometheus_multiproc_dir' in os.environ: _ValueClass = _MultiProcessValue() -elif 'prometheus_distributed' in os.environ: +elif 'prometheus_django_cache' in os.environ: from prometheus_client.distributed import DistributedValue _ValueClass = DistributedValue diff --git a/prometheus_client/distributed.py b/prometheus_client/distributed.py index 061f9bad..3c83d978 100644 --- a/prometheus_client/distributed.py +++ b/prometheus_client/distributed.py @@ -10,7 +10,8 @@ from collections import defaultdict from threading import Lock -from django.core.cache import cache +from django.core.cache import caches +cache = caches[os.environ.get('prometheus_django_cache','default')] hostname = socket.gethostname() @@ -54,16 +55,18 @@ def __exit__(self, type, value, tb): distributed_list_cache_key = 'pc_distributed_list' -distributed_list_lock = CacheLock(distributed_list_cache_key, ttl=3) +distributed_list_lock = CacheLock(distributed_list_cache_key, ttl=20) added_to_distributed_list = set() +distributed_list_ttl_minutes = 60*24*5 +distributed_value_ttl_minutes = 60*24*5 def add_to_distributed_list(pid): if not (hostname, pid) in added_to_distributed_list: with distributed_list_lock: l = cache.get(distributed_list_cache_key, set()) l.add((hostname, pid)) - cache.set(distributed_list_cache_key, l, 60 * 20) + cache.set(distributed_list_cache_key, l, distributed_list_ttl_minutes*60) added_to_distributed_list.add((hostname, pid)) @@ -83,7 +86,7 @@ def _iterate(): if (hostname, pid) in added_to_distributed_list: added_to_distributed_list.remove((hostname, pid)) - cache.set(distributed_list_cache_key, l, 60 * 20) + cache.set(distributed_list_cache_key, l, distributed_list_ttl_minutes*60) _pidFunc = os.getpid @@ -150,7 +153,7 @@ def __get_dict(self): return cache.get(self.cachekey, {}) def __set_dict(self, dict_value): - cache.set(self.cachekey, dict_value, 60 * 10) + cache.set(self.cachekey, dict_value, distributed_value_ttl_minutes) def __reset(self): with lock: From 4081b811c973fd78ca07e0073bfdc428f7619211 Mon Sep 17 00:00:00 2001 From: sovetnikov Date: Sun, 21 Oct 2018 00:43:13 +0300 Subject: [PATCH 09/33] gauge_all temporary allow --- prometheus_client/distributed.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/prometheus_client/distributed.py b/prometheus_client/distributed.py index 3c83d978..3513aa72 100644 --- a/prometheus_client/distributed.py +++ b/prometheus_client/distributed.py @@ -132,8 +132,8 @@ class DistributedValue(object): def __init__(self, typ, metric_name, name, labelnames, labelvalues, multiprocess_mode='', **kwargs): if typ == 'gauge': - if multiprocess_mode == 'all': - raise Exception('multiprocess_mode=all not supported in distributed storage') + # if multiprocess_mode == 'all': + # raise Exception('multiprocess_mode=all not supported in distributed storage') typ_prefix = typ + '_' + multiprocess_mode else: typ_prefix = typ From e6b5aeefcf7319bbbba25859eb3e45130d8a5a4d Mon Sep 17 00:00:00 2001 From: sovetnikov Date: Sun, 21 Oct 2018 00:57:59 +0300 Subject: [PATCH 10/33] debug --- prometheus_client/distributed.py | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/prometheus_client/distributed.py b/prometheus_client/distributed.py index 3513aa72..db7b741e 100644 --- a/prometheus_client/distributed.py +++ b/prometheus_client/distributed.py @@ -156,27 +156,27 @@ def __set_dict(self, dict_value): cache.set(self.cachekey, dict_value, distributed_value_ttl_minutes) def __reset(self): - with lock: - d = self.__get_dict() - if not self.valuekey in d: - d[self.valuekey] = 0 - self.__set_dict(d) + # with lock: + d = self.__get_dict() + if not self.valuekey in d: + d[self.valuekey] = 0 + self.__set_dict(d) def inc(self, amount): - with lock: - d = self.__get_dict() - d[self.valuekey] = d.get(self.valuekey, 0) + amount - self.__set_dict(d) + # with lock: + d = self.__get_dict() + d[self.valuekey] = d.get(self.valuekey, 0) + amount + self.__set_dict(d) def set(self, value): - with lock: - d = self.__get_dict() - d[self.valuekey] = value - self.__set_dict(d) + # with lock: + d = self.__get_dict() + d[self.valuekey] = value + self.__set_dict(d) def get(self): - with lock: - return self.__get_dict().get(self.valuekey, None) + # with lock: + return self.__get_dict().get(self.valuekey, None) class DistributedCollector(object): From e03c0d21e320424524b670354b957a5c840b2ca7 Mon Sep 17 00:00:00 2001 From: sovetnikov Date: Tue, 23 Oct 2018 18:31:52 +0300 Subject: [PATCH 11/33] README --- README.md | 554 +++--------------------------------------------------- 1 file changed, 22 insertions(+), 532 deletions(-) diff --git a/README.md b/README.md index a838d309..a933f374 100644 --- a/README.md +++ b/README.md @@ -1,542 +1,32 @@ # Prometheus Python Client -The official Python 2 and 3 client for [Prometheus](http://prometheus.io). +This prometheus client has ability to store all metrics in django cache. +Just set environment variable "prometheus_django_cache" to django cache name, like "default". -## Three Step Demo +## Internals +Each process has separate metrics storage in one cache entry. +Each process is identified by hostname+pid. -**One**: Install the client: -``` -pip install prometheus_client -``` +To allow collector to find all instances that stored any metrics there is global list of all available hostname+pid identifiers stored in cache. Read+Write access to that list is synchronized by distributed lock based on cache. +Process added to that list just once, when first metric value is stored. -**Two**: Paste the following into a Python interpreter: -```python -from prometheus_client import start_http_server, Summary -import random -import time +Gauge modes work like with "prometheus_multiproc_dir", but mode "all" is working like "liveall". -# Create a metric to track time spent and requests made. -REQUEST_TIME = Summary('request_processing_seconds', 'Time spent processing request') +distributed.mark_process_dead works like multiprocess.mark_process_dead but removes all gauge modes include mode "all". -# Decorate function with metric. -@REQUEST_TIME.time() -def process_request(t): - """A dummy function that takes some time.""" - time.sleep(t) +Metric storing process has no locking mechanism for read and write because metrics storage is per process with assumption that there is no theading. -if __name__ == '__main__': - # Start up the server to expose the metrics. - start_http_server(8000) - # Generate some requests. - while True: - process_request(random.random()) -``` +## Advantages +* any distributed application will work out of the box +* no FD leaking like in multiprocessing mode with uwsgi +* no files stored on disk +* theoretically any django cache backend will work (but I have some troubles with memcached on practice) +* because of django cache api simplicity, any storage can be simple plugged in -**Three**: Visit [http://localhost:8000/](http://localhost:8000/) to view the metrics. +## Disadvantages and things to do +* metrics stored in cache have ttl (5 days by now) and old counters can be evicted so counter value will decrease (ttl refreshing is planned, but django 1.7 cache API is not supported touch method). +* maybe performance can suffer because of network overhead (no performance tests are done by now) +* hostname+pid list has no cleaning process and can grow up -From one easy to use decorator you get: - * `request_processing_seconds_count`: Number of times this function was called. - * `request_processing_seconds_sum`: Total amount of time spent in this function. - -Prometheus's `rate` function allows calculation of both requests per second, -and latency over time from this data. - -In addition if you're on Linux the `process` metrics expose CPU, memory and -other information about the process for free! - -## Installation - -``` -pip install prometheus_client -``` - -This package can be found on -[PyPI](https://pypi.python.org/pypi/prometheus_client). - -## Instrumenting - -Four types of metric are offered: Counter, Gauge, Summary and Histogram. -See the documentation on [metric types](http://prometheus.io/docs/concepts/metric_types/) -and [instrumentation best practices](http://prometheus.io/docs/practices/instrumentation/#counter-vs.-gauge,-summary-vs.-histogram) -on how to use them. - -### Counter - -Counters go up, and reset when the process restarts. - - -```python -from prometheus_client import Counter -c = Counter('my_failures', 'Description of counter') -c.inc() # Increment by 1 -c.inc(1.6) # Increment by given value -``` - -If there is a suffix of `_total` on the metric name, it will be removed. When -exposing the time series for counter, a `_total` suffix will be added. This is -for compatibility between OpenMetrics and the Prometheus text format, as OpenMetrics -requires the `_total` suffix. - -There are utilities to count exceptions raised: - -```python -@c.count_exceptions() -def f(): - pass - -with c.count_exceptions(): - pass - -# Count only one type of exception -with c.count_exceptions(ValueError): - pass -``` - -### Gauge - -Gauges can go up and down. - -```python -from prometheus_client import Gauge -g = Gauge('my_inprogress_requests', 'Description of gauge') -g.inc() # Increment by 1 -g.dec(10) # Decrement by given value -g.set(4.2) # Set to a given value -``` - -There are utilities for common use cases: - -```python -g.set_to_current_time() # Set to current unixtime - -# Increment when entered, decrement when exited. -@g.track_inprogress() -def f(): - pass - -with g.track_inprogress(): - pass -``` - -A Gauge can also take its value from a callback: - -```python -d = Gauge('data_objects', 'Number of objects') -my_dict = {} -d.set_function(lambda: len(my_dict)) -``` - -### Summary - -Summaries track the size and number of events. - -```python -from prometheus_client import Summary -s = Summary('request_latency_seconds', 'Description of summary') -s.observe(4.7) # Observe 4.7 (seconds in this case) -``` - -There are utilities for timing code: - -```python -@s.time() -def f(): - pass - -with s.time(): - pass -``` - -The Python client doesn't store or expose quantile information at this time. - -### Histogram - -Histograms track the size and number of events in buckets. -This allows for aggregatable calculation of quantiles. - -```python -from prometheus_client import Histogram -h = Histogram('request_latency_seconds', 'Description of histogram') -h.observe(4.7) # Observe 4.7 (seconds in this case) -``` - -The default buckets are intended to cover a typical web/rpc request from milliseconds to seconds. -They can be overridden by passing `buckets` keyword argument to `Histogram`. - -There are utilities for timing code: - -```python -@h.time() -def f(): - pass - -with h.time(): - pass -``` - -### Info - -Info tracks key-value information, usually about a whole target. - -```python -from prometheus_client import Info -i = Info('my_build_version', 'Description of info') -i.info({'version': '1.2.3', 'buildhost': 'foo@bar'}) -``` - -### Enum - -Enum tracks which of a set of states something is currently in. - -```python -from prometheus_client import Enum -e = Enum('my_task_state', 'Description of enum', - states=['starting', 'running', 'stopped']) -e.state('running') -``` - -### Labels - -All metrics can have labels, allowing grouping of related time series. - -See the best practices on [naming](http://prometheus.io/docs/practices/naming/) -and [labels](http://prometheus.io/docs/practices/instrumentation/#use-labels). - -Taking a counter as an example: - -```python -from prometheus_client import Counter -c = Counter('my_requests_total', 'HTTP Failures', ['method', 'endpoint']) -c.labels('get', '/').inc() -c.labels('post', '/submit').inc() -``` - -Labels can also be passed as keyword-arguments: - -```python -from prometheus_client import Counter -c = Counter('my_requests_total', 'HTTP Failures', ['method', 'endpoint']) -c.labels(method='get', endpoint='/').inc() -c.labels(method='post', endpoint='/submit').inc() -``` - -### Process Collector - -The Python client automatically exports metrics about process CPU usage, RAM, -file descriptors and start time. These all have the prefix `process`, and -are only currently available on Linux. - -The namespace and pid constructor arguments allows for exporting metrics about -other processes, for example: -``` -ProcessCollector(namespace='mydaemon', pid=lambda: open('/var/run/daemon.pid').read()) -``` - -### Platform Collector - -The client also automatically exports some metadata about Python. If using Jython, -metadata about the JVM in use is also included. This information is available as -labels on the `python_info` metric. The value of the metric is 1, since it is the -labels that carry information. - -## Exporting - -There are several options for exporting metrics. - -### HTTP - -Metrics are usually exposed over HTTP, to be read by the Prometheus server. - -The easiest way to do this is via `start_http_server`, which will start a HTTP -server in a daemon thread on the given port: - -```python -from prometheus_client import start_http_server - -start_http_server(8000) -``` - -Visit [http://localhost:8000/](http://localhost:8000/) to view the metrics. - -To add Prometheus exposition to an existing HTTP server, see the `MetricsHandler` class -which provides a `BaseHTTPRequestHandler`. It also serves as a simple example of how -to write a custom endpoint. - -#### Twisted - -To use prometheus with [twisted](https://twistedmatrix.com/), there is `MetricsResource` which exposes metrics as a twisted resource. - -```python -from prometheus_client.twisted import MetricsResource -from twisted.web.server import Site -from twisted.web.resource import Resource -from twisted.internet import reactor - -root = Resource() -root.putChild(b'metrics', MetricsResource()) - -factory = Site(root) -reactor.listenTCP(8000, factory) -reactor.run() -``` - -#### WSGI - -To use Prometheus with [WSGI](http://wsgi.readthedocs.org/en/latest/), there is -`make_wsgi_app` which creates a WSGI application. - -```python -from prometheus_client import make_wsgi_app -from wsgiref.simple_server import make_server - -app = make_wsgi_app() -httpd = make_server('', 8000, app) -httpd.serve_forever() -``` - -Such an application can be useful when integrating Prometheus metrics with WSGI -apps. - -The method `start_wsgi_server` can be used to serve the metrics through the -WSGI reference implementation in a new thread. - -```python -from prometheus_client import start_wsgi_server - -start_wsgi_server(8000) -``` - -#### Flask - -To use Prometheus with [Flask](http://flask.pocoo.org/) we need to serve metrics through a Prometheus WSGI application. This can be achieved using [Flask's application dispatching](http://flask.pocoo.org/docs/latest/patterns/appdispatch/). Below is a working example. - -Save the snippet below in a `myapp.py` file - -```python -from flask import Flask -from werkzeug.wsgi import DispatcherMiddleware -from prometheus_client import make_wsgi_app - -# Create my app -app = Flask(__name__) - -# Add prometheus wsgi middleware to route /metrics requests -app_dispatch = DispatcherMiddleware(app, { - '/metrics': make_wsgi_app() -}) -``` - -Run the example web application like this - -```bash -# Install uwsgi if you do not have it -pip install uwsgi -uwsgi --http 127.0.0.1:8000 --wsgi-file myapp.py --callable app_dispatch -``` - -Visit http://localhost:8000/metrics to see the metrics - -### Node exporter textfile collector - -The [textfile collector](https://github.com/prometheus/node_exporter#textfile-collector) -allows machine-level statistics to be exported out via the Node exporter. - -This is useful for monitoring cronjobs, or for writing cronjobs to expose metrics -about a machine system that the Node exporter does not support or would not make sense -to perform at every scrape (for example, anything involving subprocesses). - -```python -from prometheus_client import CollectorRegistry, Gauge, write_to_textfile - -registry = CollectorRegistry() -g = Gauge('raid_status', '1 if raid array is okay', registry=registry) -g.set(1) -write_to_textfile('/configured/textfile/path/raid.prom', registry) -``` - -A separate registry is used, as the default registry may contain other metrics -such as those from the Process Collector. - -## Exporting to a Pushgateway - -The [Pushgateway](https://github.com/prometheus/pushgateway) -allows ephemeral and batch jobs to expose their metrics to Prometheus. - -```python -from prometheus_client import CollectorRegistry, Gauge, push_to_gateway - -registry = CollectorRegistry() -g = Gauge('job_last_success_unixtime', 'Last time a batch job successfully finished', registry=registry) -g.set_to_current_time() -push_to_gateway('localhost:9091', job='batchA', registry=registry) -``` - -A separate registry is used, as the default registry may contain other metrics -such as those from the Process Collector. - -Pushgateway functions take a grouping key. `push_to_gateway` replaces metrics -with the same grouping key, `pushadd_to_gateway` only replaces metrics with the -same name and grouping key and `delete_from_gateway` deletes metrics with the -given job and grouping key. See the -[Pushgateway documentation](https://github.com/prometheus/pushgateway/blob/master/README.md) -for more information. - -`instance_ip_grouping_key` returns a grouping key with the instance label set -to the host's IP address. - -### Handlers for authentication - -If the push gateway you are connecting to is protected with HTTP Basic Auth, -you can use a special handler to set the Authorization header. - -```python -from prometheus_client import CollectorRegistry, Gauge, push_to_gateway -from prometheus_client.exposition import basic_auth_handler - -def my_auth_handler(url, method, timeout, headers, data): - username = 'foobar' - password = 'secret123' - return basic_auth_handler(url, method, timeout, headers, data, username, password) -registry = CollectorRegistry() -g = Gauge('job_last_success_unixtime', 'Last time a batch job successfully finished', registry=registry) -g.set_to_current_time() -push_to_gateway('localhost:9091', job='batchA', registry=registry, handler=my_auth_handler) -``` - -## Bridges - -It is also possible to expose metrics to systems other than Prometheus. -This allows you to take advantage of Prometheus instrumentation even -if you are not quite ready to fully transition to Prometheus yet. - -### Graphite - -Metrics are pushed over TCP in the Graphite plaintext format. - -```python -from prometheus_client.bridge.graphite import GraphiteBridge - -gb = GraphiteBridge(('graphite.your.org', 2003)) -# Push once. -gb.push() -# Push every 10 seconds in a daemon thread. -gb.start(10.0) -``` - -## Custom Collectors - -Sometimes it is not possible to directly instrument code, as it is not -in your control. This requires you to proxy metrics from other systems. - -To do so you need to create a custom collector, for example: - -```python -from prometheus_client.core import GaugeMetricFamily, CounterMetricFamily, REGISTRY - -class CustomCollector(object): - def collect(self): - yield GaugeMetricFamily('my_gauge', 'Help text', value=7) - c = CounterMetricFamily('my_counter_total', 'Help text', labels=['foo']) - c.add_metric(['bar'], 1.7) - c.add_metric(['baz'], 3.8) - yield c - -REGISTRY.register(CustomCollector()) -``` - -`SummaryMetricFamily` and `HistogramMetricFamily` work similarly. - -A collector may implement a `describe` method which returns metrics in the same -format as `collect` (though you don't have to include the samples). This is -used to predetermine the names of time series a `CollectorRegistry` exposes and -thus to detect collisions and duplicate registrations. - -Usually custom collectors do not have to implement `describe`. If `describe` is -not implemented and the CollectorRegistry was created with `auto_desribe=True` -(which is the case for the default registry) then `collect` will be called at -registration time instead of `describe`. If this could cause problems, either -implement a proper `describe`, or if that's not practical have `describe` -return an empty list. - - -## Multiprocess Mode (Gunicorn) - -Prometheus client libaries presume a threaded model, where metrics are shared -across workers. This doesn't work so well for languages such as Python where -it's common to have processes rather than threads to handle large workloads. - -To handle this the client library can be put in multiprocess mode. -This comes with a number of limitations: - -- Registries can not be used as normal, all instantiated metrics are exported -- Custom collectors do not work (e.g. cpu and memory metrics) -- Info and Enum metrics do not work -- The pushgateway cannot be used -- Gauges cannot use the `pid` label - -There's several steps to getting this working: - -**One**: Gunicorn deployment - -The `prometheus_multiproc_dir` environment variable must be set to a directory -that the client library can use for metrics. This directory must be wiped -between Gunicorn runs (before startup is recommended). - -Put the following in the config file: -```python -from prometheus_client import multiprocess - -def child_exit(server, worker): - multiprocess.mark_process_dead(worker.pid) -``` - -**Two**: Inside the application -```python -from prometheus_client import multiprocess -from prometheus_client import generate_latest, CollectorRegistry, CONTENT_TYPE_LATEST, Gauge - -# Example gauge. -IN_PROGRESS = Gauge("inprogress_requests", "help", multiprocess_mode='livesum') - - -# Expose metrics. -@IN_PROGRESS.track_inprogress() -def app(environ, start_response): - registry = CollectorRegistry() - multiprocess.MultiProcessCollector(registry) - data = generate_latest(registry) - status = '200 OK' - response_headers = [ - ('Content-type', CONTENT_TYPE_LATEST), - ('Content-Length', str(len(data))) - ] - start_response(status, response_headers) - return iter([data]) -``` - -**Three**: Instrumentation - -Counters, Summarys and Histograms work as normal. - -Gauges have several modes they can run in, which can be selected with the -`multiprocess_mode` parameter. - -- 'all': Default. Return a timeseries per process alive or dead. -- 'liveall': Return a timeseries per process that is still alive. -- 'livesum': Return a single timeseries that is the sum of the values of alive processes. -- 'max': Return a single timeseries that is the maximum of the values of all processes, alive or dead. -- 'min': Return a single timeseries that is the minimum of the values of all processes, alive or dead. - -## Parser - -The Python client supports parsing the Prometheus text format. -This is intended for advanced use cases where you have servers -exposing Prometheus metrics and need to get them into some other -system. - -```python -from prometheus_client.parser import text_string_to_metric_families -for family in text_string_to_metric_families(u"my_gauge 1.0\n"): - for sample in family.samples: - print("Name: {0} Labels: {1} Value: {2}".format(*sample)) -``` +## Requirements +* Django 1.7+ supported \ No newline at end of file From 6f4a9f2d3666138792abdadbf1ef469f021c0eeb Mon Sep 17 00:00:00 2001 From: sovetnikov Date: Tue, 23 Oct 2018 18:32:40 +0300 Subject: [PATCH 12/33] README --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index a933f374..1806c9da 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,8 @@ distributed.mark_process_dead works like multiprocess.mark_process_dead but remo Metric storing process has no locking mechanism for read and write because metrics storage is per process with assumption that there is no theading. +Adapted multiprocess tests is working. + ## Advantages * any distributed application will work out of the box * no FD leaking like in multiprocessing mode with uwsgi From 3a2daed882614166cc3eddf0b8e2764cbc51c092 Mon Sep 17 00:00:00 2001 From: sovetnikov Date: Tue, 23 Oct 2018 18:51:29 +0300 Subject: [PATCH 13/33] README --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 1806c9da..58e92376 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ Process added to that list just once, when first metric value is stored. Gauge modes work like with "prometheus_multiproc_dir", but mode "all" is working like "liveall". -distributed.mark_process_dead works like multiprocess.mark_process_dead but removes all gauge modes include mode "all". +distributed.mark_process_dead works like multiprocess.mark_process_dead but removes all gauge modes including mode "all". Metric storing process has no locking mechanism for read and write because metrics storage is per process with assumption that there is no theading. From 4ee1be89415fd502779e4d134ad271f8bd2498a6 Mon Sep 17 00:00:00 2001 From: sovetnikov Date: Tue, 23 Oct 2018 18:53:04 +0300 Subject: [PATCH 14/33] README --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 58e92376..14814af0 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,8 @@ Adapted multiprocess tests is working. * metrics stored in cache have ttl (5 days by now) and old counters can be evicted so counter value will decrease (ttl refreshing is planned, but django 1.7 cache API is not supported touch method). * maybe performance can suffer because of network overhead (no performance tests are done by now) * hostname+pid list has no cleaning process and can grow up +* just testing it in production, it's early alpha version +* i plan to test it only with redis backend ## Requirements * Django 1.7+ supported \ No newline at end of file From 2a8ee98e6b9b009cfe1dab49a6150f3e5a2fad65 Mon Sep 17 00:00:00 2001 From: sovetnikov Date: Tue, 6 Nov 2018 11:16:16 +0300 Subject: [PATCH 15/33] None arg in _floatToGoString --- prometheus_client/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/prometheus_client/core.py b/prometheus_client/core.py index 5add4a2e..c11f0018 100644 --- a/prometheus_client/core.py +++ b/prometheus_client/core.py @@ -1092,7 +1092,7 @@ def _floatToGoString(d): return '+Inf' elif d == _MINUS_INF: return '-Inf' - elif math.isnan(d): + elif d is None or math.isnan(d): return 'NaN' else: return repr(float(d)) From 0bdbe136f87a4be2bc25d65ba40702f1735c81a9 Mon Sep 17 00:00:00 2001 From: sovetnikov Date: Tue, 6 Nov 2018 11:19:01 +0300 Subject: [PATCH 16/33] Debug --- prometheus_client/distributed.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/prometheus_client/distributed.py b/prometheus_client/distributed.py index db7b741e..133cf14b 100644 --- a/prometheus_client/distributed.py +++ b/prometheus_client/distributed.py @@ -31,8 +31,8 @@ def __nonzero__(self): def __enter__(self): global in_lock - if in_lock: - raise Exception('Already on lock ' + in_lock) + # if in_lock: + # raise Exception('Already on lock ' + in_lock) trys = 6 while trys: self.timeout_at = time.monotonic() + self.ttl From 239f563f85a340632fd8c9ea459d7a5cf899fae8 Mon Sep 17 00:00:00 2001 From: sovetnikov Date: Tue, 6 Nov 2018 11:20:22 +0300 Subject: [PATCH 17/33] None values --- prometheus_client/core.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/prometheus_client/core.py b/prometheus_client/core.py index c11f0018..db577b2c 100644 --- a/prometheus_client/core.py +++ b/prometheus_client/core.py @@ -1180,7 +1180,9 @@ def _samples(self): samples = [] acc = 0 for i, bound in enumerate(self._upper_bounds): - acc += self._buckets[i].get() + v = self._buckets[i].get() + if v is not None: + acc += v samples.append(('_bucket', {'le': _floatToGoString(bound)}, acc)) samples.append(('_count', {}, acc)) samples.append(('_sum', {}, self._sum.get())) From 4f94c43dc3090addb2f88e89da929dd86cda99af Mon Sep 17 00:00:00 2001 From: sovetnikov Date: Tue, 6 Nov 2018 11:47:02 +0300 Subject: [PATCH 18/33] Debug --- prometheus_client/distributed.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/prometheus_client/distributed.py b/prometheus_client/distributed.py index 133cf14b..bd25cd95 100644 --- a/prometheus_client/distributed.py +++ b/prometheus_client/distributed.py @@ -39,12 +39,32 @@ def __enter__(self): self.status = cache.add(self.id, 'locked', self.ttl) if self.status: in_lock = self.id + + try: + from sentry_sdk import add_breadcrumb + add_breadcrumb( + category='lock', + message='locked {self.id}'.format(**locals()), + level='info', + ) + except ImportError: + pass return self.status time.sleep(0.1) trys -= 1 raise Exception('Could not lock for {self.id}'.format(**locals())) def __exit__(self, type, value, tb): + try: + from sentry_sdk import add_breadcrumb + add_breadcrumb( + category='lock', + message='released {self.id}'.format(**locals()), + level='info', + ) + except ImportError: + pass + global in_lock in_lock = None if self.status: From af015e2d6298b6c9f5759f8ae6b4570f64e846f8 Mon Sep 17 00:00:00 2001 From: sovetnikov Date: Tue, 6 Nov 2018 11:52:32 +0300 Subject: [PATCH 19/33] Debug --- prometheus_client/distributed.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/prometheus_client/distributed.py b/prometheus_client/distributed.py index bd25cd95..449ce6bd 100644 --- a/prometheus_client/distributed.py +++ b/prometheus_client/distributed.py @@ -19,6 +19,14 @@ lock = Lock() +class FakeSuccessContextManager(object): + def __nonzero__(self): + return True + def __enter__(self): + return self + def __exit__(self, *args): + pass + class CacheLock(object): def __init__(self, lock_id, ttl): self.id = 'cachelock-{0}'.format(lock_id) @@ -30,6 +38,10 @@ def __nonzero__(self): return self.status def __enter__(self): + if self.status: + # Already locked - return fake success lock + return FakeSuccessContextManager().__enter__() + global in_lock # if in_lock: # raise Exception('Already on lock ' + in_lock) From b7de2710f6b8345d81aac881004a3c8b66e3bcd5 Mon Sep 17 00:00:00 2001 From: sovetnikov Date: Tue, 6 Nov 2018 11:57:34 +0300 Subject: [PATCH 20/33] Debug --- prometheus_client/distributed.py | 1 + 1 file changed, 1 insertion(+) diff --git a/prometheus_client/distributed.py b/prometheus_client/distributed.py index 449ce6bd..1be77399 100644 --- a/prometheus_client/distributed.py +++ b/prometheus_client/distributed.py @@ -84,6 +84,7 @@ def __exit__(self, type, value, tb): cache.delete(self.id) if self.id in cache: raise Exception('Id in cache ' + self.id) + self.status = False distributed_list_cache_key = 'pc_distributed_list' From 8583a73b78d0c9b3cfa6767da7b608fd80193711 Mon Sep 17 00:00:00 2001 From: sovetnikov Date: Tue, 6 Nov 2018 12:00:29 +0300 Subject: [PATCH 21/33] Debug --- prometheus_client/distributed.py | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/prometheus_client/distributed.py b/prometheus_client/distributed.py index 1be77399..1522da76 100644 --- a/prometheus_client/distributed.py +++ b/prometheus_client/distributed.py @@ -38,9 +38,9 @@ def __nonzero__(self): return self.status def __enter__(self): - if self.status: - # Already locked - return fake success lock - return FakeSuccessContextManager().__enter__() + # if self.status: + # # Already locked - return fake success lock + # return FakeSuccessContextManager().__enter__() global in_lock # if in_lock: @@ -52,18 +52,14 @@ def __enter__(self): if self.status: in_lock = self.id - try: - from sentry_sdk import add_breadcrumb - add_breadcrumb( - category='lock', - message='locked {self.id}'.format(**locals()), - level='info', - ) - except ImportError: - pass + from sentry_sdk import capture_message + capture_message('Locked for {self.id}'.format(**locals())) + return self.status time.sleep(0.1) trys -= 1 + from sentry_sdk import capture_message + capture_message('Could not lock for {self.id}'.format(**locals())) raise Exception('Could not lock for {self.id}'.format(**locals())) def __exit__(self, type, value, tb): From 0b74d1a0a0ba0b4a15cfc196ed477a9b864fe6ca Mon Sep 17 00:00:00 2001 From: sovetnikov Date: Tue, 6 Nov 2018 12:04:55 +0300 Subject: [PATCH 22/33] Debug --- prometheus_client/distributed.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/prometheus_client/distributed.py b/prometheus_client/distributed.py index 1522da76..255969da 100644 --- a/prometheus_client/distributed.py +++ b/prometheus_client/distributed.py @@ -52,14 +52,20 @@ def __enter__(self): if self.status: in_lock = self.id - from sentry_sdk import capture_message - capture_message('Locked for {self.id}'.format(**locals())) + from sentry_sdk import capture_exception + try: + raise Exception('Locked for {self.id}'.format(**locals())) + except Exception as e: + capture_exception(e) return self.status time.sleep(0.1) trys -= 1 - from sentry_sdk import capture_message - capture_message('Could not lock for {self.id}'.format(**locals())) + from sentry_sdk import capture_exception + try: + raise Exception('Could not lock for {self.id}'.format(**locals())) + except Exception as e: + capture_exception(e) raise Exception('Could not lock for {self.id}'.format(**locals())) def __exit__(self, type, value, tb): From 518bb79e073157f904c4f0476e530c61f119448c Mon Sep 17 00:00:00 2001 From: sovetnikov Date: Tue, 6 Nov 2018 12:15:58 +0300 Subject: [PATCH 23/33] Debug --- prometheus_client/distributed.py | 86 +++++++++++++------------------- 1 file changed, 35 insertions(+), 51 deletions(-) diff --git a/prometheus_client/distributed.py b/prometheus_client/distributed.py index 255969da..55973851 100644 --- a/prometheus_client/distributed.py +++ b/prometheus_client/distributed.py @@ -2,16 +2,19 @@ from __future__ import unicode_literals +import gc import inspect import json import os import socket import time from collections import defaultdict +from contextlib import contextmanager from threading import Lock from django.core.cache import caches -cache = caches[os.environ.get('prometheus_django_cache','default')] + +cache = caches[os.environ.get('prometheus_django_cache', 'default')] hostname = socket.gethostname() @@ -19,14 +22,18 @@ lock = Lock() + class FakeSuccessContextManager(object): def __nonzero__(self): return True + def __enter__(self): return self + def __exit__(self, *args): pass + class CacheLock(object): def __init__(self, lock_id, ttl): self.id = 'cachelock-{0}'.format(lock_id) @@ -38,47 +45,18 @@ def __nonzero__(self): return self.status def __enter__(self): - # if self.status: - # # Already locked - return fake success lock - # return FakeSuccessContextManager().__enter__() - - global in_lock - # if in_lock: - # raise Exception('Already on lock ' + in_lock) trys = 6 while trys: self.timeout_at = time.monotonic() + self.ttl self.status = cache.add(self.id, 'locked', self.ttl) if self.status: in_lock = self.id - - from sentry_sdk import capture_exception - try: - raise Exception('Locked for {self.id}'.format(**locals())) - except Exception as e: - capture_exception(e) - return self.status time.sleep(0.1) trys -= 1 - from sentry_sdk import capture_exception - try: - raise Exception('Could not lock for {self.id}'.format(**locals())) - except Exception as e: - capture_exception(e) raise Exception('Could not lock for {self.id}'.format(**locals())) def __exit__(self, type, value, tb): - try: - from sentry_sdk import add_breadcrumb - add_breadcrumb( - category='lock', - message='released {self.id}'.format(**locals()), - level='info', - ) - except ImportError: - pass - global in_lock in_lock = None if self.status: @@ -93,35 +71,41 @@ def __exit__(self, type, value, tb): distributed_list_lock = CacheLock(distributed_list_cache_key, ttl=20) added_to_distributed_list = set() -distributed_list_ttl_minutes = 60*24*5 -distributed_value_ttl_minutes = 60*24*5 +distributed_list_ttl_minutes = 60 * 24 * 5 +distributed_value_ttl_minutes = 60 * 24 * 5 + + + +@contextmanager +def gc_disabled(): + was_enabled_previously = gc.isenabled() + gc.disable() + yield + if was_enabled_previously: + gc.enable() + def add_to_distributed_list(pid): if not (hostname, pid) in added_to_distributed_list: - with distributed_list_lock: - l = cache.get(distributed_list_cache_key, set()) - l.add((hostname, pid)) - cache.set(distributed_list_cache_key, l, distributed_list_ttl_minutes*60) - added_to_distributed_list.add((hostname, pid)) + with gc_disabled(): + with distributed_list_lock: + l = cache.get(distributed_list_cache_key, set()) + l.add((hostname, pid)) + cache.set(distributed_list_cache_key, l, distributed_list_ttl_minutes * 60) + added_to_distributed_list.add((hostname, pid)) def remove_from_distributed_list(pid): - with distributed_list_lock: - l = cache.get(distributed_list_cache_key, set()) - - def _iterate(): - if isinstance(pid, int): - yield pid - else: - for p in pid: - yield p + with gc_disabled(): + with distributed_list_lock: + l = cache.get(distributed_list_cache_key, set()) - if (hostname, pid) in l: - l.remove((hostname, pid)) - if (hostname, pid) in added_to_distributed_list: - added_to_distributed_list.remove((hostname, pid)) + if (hostname, pid) in l: + l.remove((hostname, pid)) + if (hostname, pid) in added_to_distributed_list: + added_to_distributed_list.remove((hostname, pid)) - cache.set(distributed_list_cache_key, l, distributed_list_ttl_minutes*60) + cache.set(distributed_list_cache_key, l, distributed_list_ttl_minutes * 60) _pidFunc = os.getpid From f2469336b4b5897d4e3cb25fc65b5c989f815dc2 Mon Sep 17 00:00:00 2001 From: sovetnikov Date: Tue, 6 Nov 2018 13:02:57 +0300 Subject: [PATCH 24/33] Debug --- prometheus_client/distributed.py | 2 +- prometheus_client/gc_collector.py | 2 ++ tests/test_distibuted.py | 20 +++++++++++++++----- 3 files changed, 18 insertions(+), 6 deletions(-) diff --git a/prometheus_client/distributed.py b/prometheus_client/distributed.py index 55973851..d4d9792b 100644 --- a/prometheus_client/distributed.py +++ b/prometheus_client/distributed.py @@ -233,7 +233,7 @@ def merge(self, cache_values, accumulate=True): metric = metrics.get(metric_name) if metric is None: - metric = core.Metric(metric_name, 'Multiprocess metric', typ) + metric = core.Metric(metric_name, 'Distributed metric', typ) metrics[metric_name] = metric if typ == 'gauge': diff --git a/prometheus_client/gc_collector.py b/prometheus_client/gc_collector.py index 5acbeedb..06002609 100644 --- a/prometheus_client/gc_collector.py +++ b/prometheus_client/gc_collector.py @@ -18,6 +18,8 @@ def __init__(self, registry=core.REGISTRY, gc=gc): # the GC collector is always disabled in multiprocess mode. if 'prometheus_multiproc_dir' in os.environ: return + if 'prometheus_django_cache' in os.environ: + return if not hasattr(gc, 'callbacks'): return diff --git a/tests/test_distibuted.py b/tests/test_distibuted.py index 2cb63884..04504b4c 100644 --- a/tests/test_distibuted.py +++ b/tests/test_distibuted.py @@ -97,11 +97,11 @@ def test_histogram_adds(self): self.assertEqual(3, self.registry.get_sample_value('h_sum')) self.assertEqual(2, self.registry.get_sample_value('h_bucket', {'le': '5.0'})) - def test_gauge_all(self): - self.pid = 123 - with self.assertRaises(Exception) as e: - g1 = Gauge('g1', 'help', registry=None) - self.assertTrue('not supported' in str(e.exception)) + # def test_gauge_all(self): + # self.pid = 123 + # with self.assertRaises(Exception) as e: + # g1 = Gauge('g1', 'help', registry=None) + # self.assertTrue('not supported' in str(e.exception)) def test_gauge_liveall(self): self.pid = 123 @@ -189,16 +189,20 @@ def add_label(key, value): c = Counter('c', 'help', labelnames=labels.keys(), registry=None) g = Gauge('g', 'help', labelnames=labels.keys(), registry=None, multiprocess_mode='liveall') + gall = Gauge('gall', 'help', labelnames=labels.keys(), registry=None, multiprocess_mode='all') + gempty = Gauge('gempty', 'help', labelnames=labels.keys(), registry=None, multiprocess_mode='all') h = Histogram('h', 'help', labelnames=labels.keys(), registry=None) c.labels(**labels).inc(1) g.labels(**labels).set(1) + gall.labels(**labels).set(1) h.labels(**labels).observe(1) self.pid = 1 c.labels(**labels).inc(1) g.labels(**labels).set(1) + gall.labels(**labels).set(1) h.labels(**labels).observe(5) metrics = dict((m.name, m) for m in self.collector.collect()) @@ -209,10 +213,16 @@ def add_label(key, value): metrics['g'].samples.sort(key=lambda x: x[1]['pid']) for sample in metrics['g'].samples: sample.labels.pop('hostname') + for sample in metrics['gall'].samples: + sample.labels.pop('hostname') self.assertEqual(metrics['g'].samples, [ Sample('g', add_label('pid', '0'), 1.0), Sample('g', add_label('pid', '1'), 1.0), ]) + self.assertEqual(metrics['gall'].samples, [ + Sample('gall', add_label('pid', '0'), 1.0), + Sample('gall', add_label('pid', '1'), 1.0), + ]) metrics['h'].samples.sort( key=lambda x: (x[0], float(x[1].get('le', 0))) From b6aecc733f25cb62aef2618cc2cb80bbbc9dcfae Mon Sep 17 00:00:00 2001 From: sovetnikov Date: Tue, 6 Nov 2018 15:07:34 +0300 Subject: [PATCH 25/33] gauge multiprocess mode "last" --- prometheus_client/core.py | 5 +++-- prometheus_client/distributed.py | 35 ++++++++++++++++++++++---------- tests/test_distibuted.py | 8 ++++++++ 3 files changed, 35 insertions(+), 13 deletions(-) diff --git a/prometheus_client/core.py b/prometheus_client/core.py index db577b2c..217ac787 100644 --- a/prometheus_client/core.py +++ b/prometheus_client/core.py @@ -971,11 +971,12 @@ def f(): ''' _type = 'gauge' _reserved_labelnames = [] - _MULTIPROC_MODES = frozenset(('min', 'max', 'livesum', 'liveall', 'all')) + _MULTIPROC_MODES = frozenset(('min', 'max', 'livesum', 'liveall', 'all', 'last')) def __init__(self, name, labelnames, labelvalues, multiprocess_mode='all'): if (_ValueClass._multiprocess and - multiprocess_mode not in self._MULTIPROC_MODES): + (multiprocess_mode not in self._MULTIPROC_MODES or multiprocess_mode == 'last')): + raise ValueError('Invalid multiprocess mode: ' + multiprocess_mode) self._value = _ValueClass( self._type, name, name, labelnames, labelvalues, diff --git a/prometheus_client/distributed.py b/prometheus_client/distributed.py index d4d9792b..7cb714e0 100644 --- a/prometheus_client/distributed.py +++ b/prometheus_client/distributed.py @@ -175,27 +175,33 @@ def __set_dict(self, dict_value): cache.set(self.cachekey, dict_value, distributed_value_ttl_minutes) def __reset(self): - # with lock: + ts = int(time.time()) d = self.__get_dict() if not self.valuekey in d: - d[self.valuekey] = 0 + d[self.valuekey] = (0, ts) self.__set_dict(d) def inc(self, amount): - # with lock: + ts = int(time.time()) d = self.__get_dict() - d[self.valuekey] = d.get(self.valuekey, 0) + amount + v = d.get(self.valuekey, 0) + if isinstance(v, tuple): + v = v[0] + d[self.valuekey] = (v + amount, ts) self.__set_dict(d) def set(self, value): - # with lock: + ts = int(time.time()) d = self.__get_dict() - d[self.valuekey] = value + d[self.valuekey] = (value, ts) self.__set_dict(d) def get(self): # with lock: - return self.__get_dict().get(self.valuekey, None) + v = self.__get_dict().get(self.valuekey, None) + if isinstance(v, tuple): + v = v[0] + return v class DistributedCollector(object): @@ -227,7 +233,7 @@ def merge(self, cache_values, accumulate=True): multiprocess_mode = None if '_' in typ: typ, multiprocess_mode = typ.split('_') - for key, value in cache_value.items(): + for key, (value, value_ts) in cache_value.items(): metric_name, name, labels = json.loads(key) labels_key = tuple(sorted(labels.items())) @@ -240,7 +246,7 @@ def merge(self, cache_values, accumulate=True): metric._multiprocess_mode = multiprocess_mode metric.add_sample(name, labels_key + (('pid', pid), ('hostname', value_hostname) - ), value) + ), value, timestamp=value_ts) else: # The duplicates and labels are fixed in the next for. metric.add_sample(name, labels_key, value) @@ -248,8 +254,10 @@ def merge(self, cache_values, accumulate=True): for metric in metrics.values(): samples = defaultdict(float) buckets = {} + samples_ts = {} + for s in metric.samples: - name, labels, value = s.name, s.labels, s.value + name, labels, value, value_ts = s.name, s.labels, s.value, s.timestamp if metric.type == 'gauge': without_pid = tuple(l for l in labels if l[0] != 'pid') if metric._multiprocess_mode == 'min': @@ -262,6 +270,11 @@ def merge(self, cache_values, accumulate=True): samples[(s.name, without_pid)] = value elif metric._multiprocess_mode == 'livesum': samples[(name, without_pid)] += value + elif metric._multiprocess_mode == 'last': + current_ts = samples_ts.setdefault((name, without_pid), value_ts) + if value_ts >= current_ts: + samples[(name, without_pid)] = value + samples_ts[(name, without_pid)] = value_ts else: # all/liveall samples[(name, labels)] = value @@ -299,7 +312,7 @@ def merge(self, cache_values, accumulate=True): samples[(metric.name + '_count', labels)] = acc # Convert to correct sample format. - metric.samples = [core.Sample(name, dict(labels), value) for (name, labels), value in samples.items()] + metric.samples = [core.Sample(name, dict(labels), value, samples_ts.get((name, labels))) for (name, labels), value in samples.items()] return metrics.values() diff --git a/tests/test_distibuted.py b/tests/test_distibuted.py index 04504b4c..96c3bb35 100644 --- a/tests/test_distibuted.py +++ b/tests/test_distibuted.py @@ -103,6 +103,14 @@ def test_histogram_adds(self): # g1 = Gauge('g1', 'help', registry=None) # self.assertTrue('not supported' in str(e.exception)) + def test_gauge_last(self): + self.pid = 123 + g1 = Gauge('g1last', 'help', registry=None, multiprocess_mode='last') + g1.set(1) + self.pid = 456 + g1.set(2) + self.assertEqual(2, self.registry.get_sample_value('g1last', {'hostname': hostname})) + def test_gauge_liveall(self): self.pid = 123 g1 = Gauge('g2', 'help', registry=None, multiprocess_mode='liveall') From bba2ebde45737b8a5f1d33e573e12794e26c6dbc Mon Sep 17 00:00:00 2001 From: sovetnikov Date: Tue, 6 Nov 2018 15:14:31 +0300 Subject: [PATCH 26/33] gauge multiprocess mode "last" --- prometheus_client/distributed.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/prometheus_client/distributed.py b/prometheus_client/distributed.py index 7cb714e0..968d3dfd 100644 --- a/prometheus_client/distributed.py +++ b/prometheus_client/distributed.py @@ -233,7 +233,11 @@ def merge(self, cache_values, accumulate=True): multiprocess_mode = None if '_' in typ: typ, multiprocess_mode = typ.split('_') - for key, (value, value_ts) in cache_value.items(): + for key, value in cache_value.items(): + if isinstance(value, tuple): + value, value_ts = value + else: + value_ts = None metric_name, name, labels = json.loads(key) labels_key = tuple(sorted(labels.items())) From 1369c342e3fae8c5193e19ba73e045d46830fe16 Mon Sep 17 00:00:00 2001 From: sovetnikov Date: Tue, 6 Nov 2018 23:25:05 +0300 Subject: [PATCH 27/33] Debug --- prometheus_client/distributed.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/prometheus_client/distributed.py b/prometheus_client/distributed.py index 968d3dfd..f11abeca 100644 --- a/prometheus_client/distributed.py +++ b/prometheus_client/distributed.py @@ -275,6 +275,8 @@ def merge(self, cache_values, accumulate=True): elif metric._multiprocess_mode == 'livesum': samples[(name, without_pid)] += value elif metric._multiprocess_mode == 'last': + if not value_ts: + raise Exception('no timestamp for "' + str((name, without_pid)) + '"') current_ts = samples_ts.setdefault((name, without_pid), value_ts) if value_ts >= current_ts: samples[(name, without_pid)] = value From a8827fa0ade1d33fe057e78aa0193dfd9d5acc7d Mon Sep 17 00:00:00 2001 From: sovetnikov Date: Tue, 6 Nov 2018 23:36:13 +0300 Subject: [PATCH 28/33] Debug --- prometheus_client/distributed.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/prometheus_client/distributed.py b/prometheus_client/distributed.py index f11abeca..f32e9da8 100644 --- a/prometheus_client/distributed.py +++ b/prometheus_client/distributed.py @@ -276,7 +276,8 @@ def merge(self, cache_values, accumulate=True): samples[(name, without_pid)] += value elif metric._multiprocess_mode == 'last': if not value_ts: - raise Exception('no timestamp for "' + str((name, without_pid)) + '"') + # Some wrong data, possible from previous versions + value_ts = int(time.time()) current_ts = samples_ts.setdefault((name, without_pid), value_ts) if value_ts >= current_ts: samples[(name, without_pid)] = value From 0c9b28110a5cac59bab9bc03b5c8db3cd4a716c2 Mon Sep 17 00:00:00 2001 From: sovetnikov Date: Tue, 6 Nov 2018 23:36:48 +0300 Subject: [PATCH 29/33] Debug --- prometheus_client/distributed.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/prometheus_client/distributed.py b/prometheus_client/distributed.py index f32e9da8..3a815322 100644 --- a/prometheus_client/distributed.py +++ b/prometheus_client/distributed.py @@ -277,7 +277,7 @@ def merge(self, cache_values, accumulate=True): elif metric._multiprocess_mode == 'last': if not value_ts: # Some wrong data, possible from previous versions - value_ts = int(time.time()) + value_ts = int(time.time()) - 60*60*5 current_ts = samples_ts.setdefault((name, without_pid), value_ts) if value_ts >= current_ts: samples[(name, without_pid)] = value From 628fc5c53ee976d6408e7f58cb9a45c59731b821 Mon Sep 17 00:00:00 2001 From: sovetnikov Date: Tue, 6 Nov 2018 23:55:26 +0300 Subject: [PATCH 30/33] No host for gauge_last --- prometheus_client/distributed.py | 7 ++++--- tests/test_distibuted.py | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/prometheus_client/distributed.py b/prometheus_client/distributed.py index 3a815322..3e38b019 100644 --- a/prometheus_client/distributed.py +++ b/prometheus_client/distributed.py @@ -275,13 +275,14 @@ def merge(self, cache_values, accumulate=True): elif metric._multiprocess_mode == 'livesum': samples[(name, without_pid)] += value elif metric._multiprocess_mode == 'last': + without_hostname_pid = tuple(l for l in labels if l[0] != 'pid' and l[0] != 'hostname') if not value_ts: # Some wrong data, possible from previous versions value_ts = int(time.time()) - 60*60*5 - current_ts = samples_ts.setdefault((name, without_pid), value_ts) + current_ts = samples_ts.setdefault((name, without_hostname_pid), value_ts) if value_ts >= current_ts: - samples[(name, without_pid)] = value - samples_ts[(name, without_pid)] = value_ts + samples[(name, without_hostname_pid)] = value + samples_ts[(name, without_hostname_pid)] = value_ts else: # all/liveall samples[(name, labels)] = value diff --git a/tests/test_distibuted.py b/tests/test_distibuted.py index 96c3bb35..384a7610 100644 --- a/tests/test_distibuted.py +++ b/tests/test_distibuted.py @@ -109,7 +109,7 @@ def test_gauge_last(self): g1.set(1) self.pid = 456 g1.set(2) - self.assertEqual(2, self.registry.get_sample_value('g1last', {'hostname': hostname})) + self.assertEqual(2, self.registry.get_sample_value('g1last')) def test_gauge_liveall(self): self.pid = 123 From af84d4e29ec891bba2f43932ecd020d2f0c3850a Mon Sep 17 00:00:00 2001 From: sovetnikov Date: Tue, 6 Nov 2018 23:57:32 +0300 Subject: [PATCH 31/33] Readme --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 14814af0..c834f4fa 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,8 @@ This prometheus client has ability to store all metrics in django cache. Just set environment variable "prometheus_django_cache" to django cache name, like "default". +Gauge has new mode "last" - only last value is collected. + ## Internals Each process has separate metrics storage in one cache entry. Each process is identified by hostname+pid. From ecd367c2c64168bd5d207f49694e2a67f478eb25 Mon Sep 17 00:00:00 2001 From: sovetnikov Date: Wed, 7 Nov 2018 16:06:35 +0300 Subject: [PATCH 32/33] low ttl error --- prometheus_client/distributed.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/prometheus_client/distributed.py b/prometheus_client/distributed.py index 3e38b019..9d7cbeea 100644 --- a/prometheus_client/distributed.py +++ b/prometheus_client/distributed.py @@ -172,7 +172,7 @@ def __get_dict(self): return cache.get(self.cachekey, {}) def __set_dict(self, dict_value): - cache.set(self.cachekey, dict_value, distributed_value_ttl_minutes) + cache.set(self.cachekey, dict_value, distributed_value_ttl_minutes*60) def __reset(self): ts = int(time.time()) From 7de3450f1acf2a329ce971ae858792136160f05d Mon Sep 17 00:00:00 2001 From: sovetnikov Date: Mon, 21 Oct 2019 15:12:06 +0300 Subject: [PATCH 33/33] more time to wait on lock --- prometheus_client/distributed.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/prometheus_client/distributed.py b/prometheus_client/distributed.py index 9d7cbeea..de53170c 100644 --- a/prometheus_client/distributed.py +++ b/prometheus_client/distributed.py @@ -2,6 +2,8 @@ from __future__ import unicode_literals +from random import randint + import gc import inspect import json @@ -45,14 +47,14 @@ def __nonzero__(self): return self.status def __enter__(self): - trys = 6 + trys = 40 while trys: self.timeout_at = time.monotonic() + self.ttl self.status = cache.add(self.id, 'locked', self.ttl) if self.status: in_lock = self.id return self.status - time.sleep(0.1) + time.sleep(randint(1,10)/10) trys -= 1 raise Exception('Could not lock for {self.id}'.format(**locals()))