From ddb23a1e324deede95c2c58cd128e170d1276166 Mon Sep 17 00:00:00 2001 From: Maykel Moya Date: Tue, 17 Mar 2015 19:11:55 +0100 Subject: [PATCH 1/8] Add support for exporting metrics to a pushgateway --- README.md | 16 ++++++++++++++++ prometheus_client/__init__.py | 30 ++++++++++++++++++++++++++++++ tests/test_client.py | 30 ++++++++++++++++++++++++++++++ 3 files changed, 76 insertions(+) diff --git a/README.md b/README.md index fedfd1e0..20b220f8 100644 --- a/README.md +++ b/README.md @@ -181,3 +181,19 @@ write_to_textfile('/configured/textfile/path/raid.prom', registry) ``` A separate registry is used, as the default registry may contain other metrics. + +## Exporting to a Pushgateway + +The [Pushgateway](https://github.com/prometheus/pushgateway) +allows ephemeral and batch jobs to expose their metrics to Prometheus. + +```python +from prometheus_client import CollectorRegistry,Gauge,build_pushgateway_url,push_to_gateway +registry = CollectorRegistry() +g = Gauge('raid_status', '1 if raid array is okay', registry=registry) +g.set(1) +url = build_pushgateway_url(job='cooljob', host='pushgateway.mydomain') +push_to_gateway(url, registry) +``` + +A separate registry is used, as the default registry may contain other metrics. diff --git a/prometheus_client/__init__.py b/prometheus_client/__init__.py index 04075779..0d3ea04b 100644 --- a/prometheus_client/__init__.py +++ b/prometheus_client/__init__.py @@ -5,8 +5,10 @@ import copy import re import os +import socket import time import threading +import urllib2 try: from BaseHTTPServer import BaseHTTPRequestHandler except ImportError: @@ -15,6 +17,7 @@ from functools import wraps from threading import Lock + __all__ = ['Counter', 'Gauge', 'Summary', 'Histogram'] _METRIC_NAME_RE = re.compile(r'^[a-zA-Z_:][a-zA-Z0-9_:]*$') @@ -434,6 +437,33 @@ def do_GET(self): self.wfile.write(generate_latest(REGISTRY)) +def build_pushgateway_url(job, instance=None, host='localhost', port=9091, + use_fqdn_as_instance=True): + ''' + Build a valid pushgateway url + ''' + + if instance is None and use_fqdn_as_instance: + instance = socket.getfqdn() + + if instance: + instancestr = '/instances/{}'.format(instance) + else: + instancestr = '' + + url = 'http://{}:{}/metrics/jobs/{}{}'.format(host, port, job, instancestr) + return url + + +def push_to_gateway(url, registry, timeout=None): + '''Push metrics to the given url''' + + resp = urllib2.urlopen(url, data=generate_latest(registry), timeout=timeout) + if resp.code >= 400: + raise IOError("error pushing to pushgateway: {0} {1}".format( + resp.code, resp.msg)) + + def write_to_textfile(path, registry): '''Write metrics to the given path. diff --git a/tests/test_client.py b/tests/test_client.py index a28f3f4a..b14fcc5a 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,8 +1,10 @@ from __future__ import unicode_literals +import socket import unittest from prometheus_client import Gauge, Counter, Summary, Histogram from prometheus_client import CollectorRegistry, generate_latest +from prometheus_client import build_pushgateway_url class TestCounter(unittest.TestCase): @@ -266,5 +268,33 @@ def test_escaping(self): self.assertEqual(b'# HELP cc A\\ncount\\\\er\n# TYPE cc counter\ncc{a="\\\\x\\n\\""} 1.0\n', generate_latest(self.registry)) +class TestBuildPushgatewayUrl(unittest.TestCase): + def test_job_instance(self): + expected = 'http://localhost:9091/metrics/jobs/foojob/instances/fooinstance' + + url = build_pushgateway_url('foojob', 'fooinstance') + self.assertEqual(url, expected) + + def test_host_port(self): + expected = 'http://foohost:9092/metrics/jobs/foojob' + + url = build_pushgateway_url('foojob', host='foohost', port=9092, + use_fqdn_as_instance=False) + self.assertEqual(url, expected) + + def test_no_fqdn(self): + expected = 'http://localhost:9091/metrics/jobs/foojob' + + url = build_pushgateway_url(job='foojob', use_fqdn_as_instance=False) + self.assertEqual(url, expected) + + def test_fqdn(self): + fqdn = socket.getfqdn() + expected = 'http://localhost:9091/metrics/jobs/foojob/instances/' + fqdn + + url = build_pushgateway_url(job='foojob') + self.assertEqual(url, expected) + + if __name__ == '__main__': unittest.main() From e1fb108f7d459ab1d1a64a8345a1bb950ba9c0de Mon Sep 17 00:00:00 2001 From: Maykel Moya Date: Mon, 23 Mar 2015 19:46:28 +0100 Subject: [PATCH 2/8] Remove logic for using FQDN as instance name --- prometheus_client/__init__.py | 7 +------ tests/test_client.py | 17 +---------------- 2 files changed, 2 insertions(+), 22 deletions(-) diff --git a/prometheus_client/__init__.py b/prometheus_client/__init__.py index 0d3ea04b..3181c784 100644 --- a/prometheus_client/__init__.py +++ b/prometheus_client/__init__.py @@ -5,7 +5,6 @@ import copy import re import os -import socket import time import threading import urllib2 @@ -437,15 +436,11 @@ def do_GET(self): self.wfile.write(generate_latest(REGISTRY)) -def build_pushgateway_url(job, instance=None, host='localhost', port=9091, - use_fqdn_as_instance=True): +def build_pushgateway_url(job, instance=None, host='localhost', port=9091): ''' Build a valid pushgateway url ''' - if instance is None and use_fqdn_as_instance: - instance = socket.getfqdn() - if instance: instancestr = '/instances/{}'.format(instance) else: diff --git a/tests/test_client.py b/tests/test_client.py index b14fcc5a..4478c8e8 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,5 +1,4 @@ from __future__ import unicode_literals -import socket import unittest from prometheus_client import Gauge, Counter, Summary, Histogram @@ -278,21 +277,7 @@ def test_job_instance(self): def test_host_port(self): expected = 'http://foohost:9092/metrics/jobs/foojob' - url = build_pushgateway_url('foojob', host='foohost', port=9092, - use_fqdn_as_instance=False) - self.assertEqual(url, expected) - - def test_no_fqdn(self): - expected = 'http://localhost:9091/metrics/jobs/foojob' - - url = build_pushgateway_url(job='foojob', use_fqdn_as_instance=False) - self.assertEqual(url, expected) - - def test_fqdn(self): - fqdn = socket.getfqdn() - expected = 'http://localhost:9091/metrics/jobs/foojob/instances/' + fqdn - - url = build_pushgateway_url(job='foojob') + url = build_pushgateway_url('foojob', host='foohost', port=9092) self.assertEqual(url, expected) From 7bfdf8b655c0a27e81da9350fe02cbfab1127363 Mon Sep 17 00:00:00 2001 From: Maykel Moya Date: Mon, 23 Mar 2015 19:56:18 +0100 Subject: [PATCH 3/8] Encapsulate url construction --- README.md | 5 ++--- prometheus_client/__init__.py | 9 ++++++++- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 20b220f8..644aea89 100644 --- a/README.md +++ b/README.md @@ -188,12 +188,11 @@ The [Pushgateway](https://github.com/prometheus/pushgateway) allows ephemeral and batch jobs to expose their metrics to Prometheus. ```python -from prometheus_client import CollectorRegistry,Gauge,build_pushgateway_url,push_to_gateway +from prometheus_client import CollectorRegistry,Gauge,push_to_gateway registry = CollectorRegistry() g = Gauge('raid_status', '1 if raid array is okay', registry=registry) g.set(1) -url = build_pushgateway_url(job='cooljob', host='pushgateway.mydomain') -push_to_gateway(url, registry) +push_to_gateway(registry, job='somejob', host='pushgateway.mydomain') ``` A separate registry is used, as the default registry may contain other metrics. diff --git a/prometheus_client/__init__.py b/prometheus_client/__init__.py index 3181c784..bdfc234f 100644 --- a/prometheus_client/__init__.py +++ b/prometheus_client/__init__.py @@ -450,7 +450,7 @@ def build_pushgateway_url(job, instance=None, host='localhost', port=9091): return url -def push_to_gateway(url, registry, timeout=None): +def push_to_gateway_url(url, registry, timeout=None): '''Push metrics to the given url''' resp = urllib2.urlopen(url, data=generate_latest(registry), timeout=timeout) @@ -459,6 +459,13 @@ def push_to_gateway(url, registry, timeout=None): resp.code, resp.msg)) +def push_to_gateway(registry, job, instance=None, host='localhost', port=9091, timeout=None): + '''Push metrics to a pushgateway''' + + url = build_pushgateway_url(job, instance, host, port) + push_to_gateway_url(url, registry, timeout) + + def write_to_textfile(path, registry): '''Write metrics to the given path. From b5542272b826d49a64c586bd288cd7f1ec330a7b Mon Sep 17 00:00:00 2001 From: Maykel Moya Date: Mon, 23 Mar 2015 20:09:42 +0100 Subject: [PATCH 4/8] Use set_to_current_time as better example --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 644aea89..434baa20 100644 --- a/README.md +++ b/README.md @@ -190,9 +190,9 @@ allows ephemeral and batch jobs to expose their metrics to Prometheus. ```python from prometheus_client import CollectorRegistry,Gauge,push_to_gateway registry = CollectorRegistry() -g = Gauge('raid_status', '1 if raid array is okay', registry=registry) -g.set(1) -push_to_gateway(registry, job='somejob', host='pushgateway.mydomain') +g = Gauge('job_finished_ok_at', 'last time a batch job successfully finished', registry=registry) +g.set_to_current_time() +push_to_gateway(registry, job='batchA', host='pushgateway.mydomain') ``` A separate registry is used, as the default registry may contain other metrics. From 4065c3b5291a97d4c6fc2f3080dd5e2e80bd1c76 Mon Sep 17 00:00:00 2001 From: Maykel Moya Date: Mon, 23 Mar 2015 21:32:43 +0100 Subject: [PATCH 5/8] Url escape job and instance values --- prometheus_client/__init__.py | 4 +++- tests/test_client.py | 7 +++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/prometheus_client/__init__.py b/prometheus_client/__init__.py index bdfc234f..c18e83b0 100644 --- a/prometheus_client/__init__.py +++ b/prometheus_client/__init__.py @@ -446,7 +446,9 @@ def build_pushgateway_url(job, instance=None, host='localhost', port=9091): else: instancestr = '' - url = 'http://{}:{}/metrics/jobs/{}{}'.format(host, port, job, instancestr) + url = 'http://{}:{}/metrics/jobs/{}{}'.format(host, port, + urllib2.quote(job), + urllib2.quote(instancestr)) return url diff --git a/tests/test_client.py b/tests/test_client.py index 4478c8e8..049729b8 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -280,6 +280,13 @@ def test_host_port(self): url = build_pushgateway_url('foojob', host='foohost', port=9092) self.assertEqual(url, expected) + def test_url_escaping(self): + expected = 'http://localhost:9091/metrics/jobs/foo%20job' + + url = build_pushgateway_url('foo job') + self.assertEqual(url, expected) + + if __name__ == '__main__': unittest.main() From ec4564fbbd1393b19af930cd9cbf90b77d390e07 Mon Sep 17 00:00:00 2001 From: Maykel Moya Date: Mon, 23 Mar 2015 21:33:37 +0100 Subject: [PATCH 6/8] py3 compat --- prometheus_client/__init__.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/prometheus_client/__init__.py b/prometheus_client/__init__.py index c18e83b0..65bb7324 100644 --- a/prometheus_client/__init__.py +++ b/prometheus_client/__init__.py @@ -7,7 +7,14 @@ import os import time import threading -import urllib2 + +try: + from urllib2 import urlopen, quote +except ImportError: + # Python 3 + from urllib.request import urlopen + from urllib.parse import quote + try: from BaseHTTPServer import BaseHTTPRequestHandler except ImportError: @@ -447,15 +454,15 @@ def build_pushgateway_url(job, instance=None, host='localhost', port=9091): instancestr = '' url = 'http://{}:{}/metrics/jobs/{}{}'.format(host, port, - urllib2.quote(job), - urllib2.quote(instancestr)) + quote(job), + quote(instancestr)) return url def push_to_gateway_url(url, registry, timeout=None): '''Push metrics to the given url''' - resp = urllib2.urlopen(url, data=generate_latest(registry), timeout=timeout) + resp = urlopen(url, data=generate_latest(registry), timeout=timeout) if resp.code >= 400: raise IOError("error pushing to pushgateway: {0} {1}".format( resp.code, resp.msg)) From bc3240f2f24f36c91793ce4962181497335661c4 Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Tue, 5 May 2015 22:20:23 +0100 Subject: [PATCH 7/8] Allow for other ways for values to be managed. --- prometheus_client/__init__.py | 131 +++++++++++++++++++--------------- 1 file changed, 72 insertions(+), 59 deletions(-) diff --git a/prometheus_client/__init__.py b/prometheus_client/__init__.py index 59af12d2..29fa38eb 100644 --- a/prometheus_client/__init__.py +++ b/prometheus_client/__init__.py @@ -96,11 +96,34 @@ def add_sample(self, name, labels, value): self._samples.append((name, labels, value)) +class _MutexValue(object): + '''A float protected by a mutex.''' + + def __init__(self, name, labelnames, labelvalues): + self._value = 0.0 + self._lock = Lock() + + def inc(self, amount): + with self._lock: + self._value += amount + + def set(self, value): + with self._lock: + self._value = value + + def get(self): + with self._lock: + return self._value + +_ValueClass = _MutexValue + + class _LabelWrapper(object): '''Handles labels for the wrapped metric.''' - def __init__(self, wrappedClass, labelnames, **kwargs): + def __init__(self, wrappedClass, name, labelnames, **kwargs): self._wrappedClass = wrappedClass self._type = wrappedClass._type + self._name = name self._labelnames = labelnames self._kwargs = kwargs self._lock = Lock() @@ -130,7 +153,7 @@ def labels(self, *labelvalues): labelvalues = tuple([unicode(l) for l in labelvalues]) with self._lock: if labelvalues not in self._metrics: - self._metrics[labelvalues] = self._wrappedClass(**self._kwargs) + self._metrics[labelvalues] = self._wrappedClass(self._name, self._labelnames, labelvalues, **self._kwargs) return self._metrics[labelvalues] def remove(self, *labelvalues): @@ -153,7 +176,15 @@ def _samples(self): def _MetricWrapper(cls): '''Provides common functionality for metrics.''' def init(name, documentation, labelnames=(), namespace='', subsystem='', registry=REGISTRY, **kwargs): + full_name = '' + if namespace: + full_name += namespace + '_' + if subsystem: + full_name += subsystem + '_' + full_name += name + if labelnames: + labelnames = tuple(labelnames) for l in labelnames: if not _METRIC_LABEL_NAME_RE.match(l): raise ValueError('Invalid label metric name: ' + l) @@ -161,16 +192,9 @@ def init(name, documentation, labelnames=(), namespace='', subsystem='', registr raise ValueError('Reserved label metric name: ' + l) if l in cls._reserved_labelnames: raise ValueError('Reserved label metric name: ' + l) - collector = _LabelWrapper(cls, labelnames, **kwargs) + collector = _LabelWrapper(cls, name, labelnames, **kwargs) else: - collector = cls(**kwargs) - - full_name = '' - if namespace: - full_name += namespace + '_' - if subsystem: - full_name += subsystem + '_' - full_name += name + collector = cls(name, labelnames, (), **kwargs) if not _METRIC_NAME_RE.match(full_name): raise ValueError('Invalid metric name: ' + full_name) @@ -194,16 +218,14 @@ class Counter(object): _type = 'counter' _reserved_labelnames = [] - def __init__(self): - self._value = 0.0 - self._lock = Lock() + def __init__(self, name, labelnames, labelvalues): + self._value = _ValueClass(name, labelnames, labelvalues) def inc(self, amount=1): '''Increment counter by the given amount.''' if amount < 0: raise ValueError('Counters can only be incremented by non-negative amounts.') - with self._lock: - self._value += amount + self._value.inc(amount) def count_exceptions(self, exception=Exception): '''Count exceptions in a block of code or function. @@ -234,8 +256,7 @@ def wrapped(*args, **kwargs): return ExceptionCounter(self) def _samples(self): - with self._lock: - return (('', {}, self._value), ) + return (('', {}, self._value.get()), ) @_MetricWrapper @@ -243,24 +264,20 @@ class Gauge(object): _type = 'gauge' _reserved_labelnames = [] - def __init__(self): - self._value = 0.0 - self._lock = Lock() + def __init__(self, name, labelnames, labelvalues): + self._value = _ValueClass(name, labelnames, labelvalues) def inc(self, amount=1): '''Increment gauge by the given amount.''' - with self._lock: - self._value += amount + self._value.inc(amount) def dec(self, amount=1): '''Decrement gauge by the given amount.''' - with self._lock: - self._value -= amount + self._value.inc(-amount) def set(self, value): '''Set gauge to the given value.''' - with self._lock: - self._value = float(value) + self._value.set(float(value)) def set_to_current_time(self): '''Set gauge to the current unixtime.''' @@ -305,8 +322,7 @@ def samples(self): self._samples = types.MethodType(samples, self) def _samples(self): - with self._lock: - return (('', {}, self._value), ) + return (('', {}, self._value.get()), ) @_MetricWrapper @@ -314,16 +330,14 @@ class Summary(object): _type = 'summary' _reserved_labelnames = ['quantile'] - def __init__(self): - self._count = 0.0 - self._sum = 0.0 - self._lock = Lock() + def __init__(self, name, labelnames, labelvalues): + self._count = _ValueClass(name + '_count', labelnames, labelvalues) + self._sum = _ValueClass(name + '_sum', labelnames, labelvalues) def observe(self, amount): '''Observe the given amount.''' - with self._lock: - self._count += 1 - self._sum += amount + self._count.inc(1) + self._sum.inc(amount) def time(self): '''Time a block of code or function, and observe the duration in seconds. @@ -352,10 +366,9 @@ def wrapped(*args, **kwargs): return Timer(self) def _samples(self): - with self._lock: - return ( - ('_count', {}, self._count), - ('_sum', {}, self._sum)) + return ( + ('_count', {}, self._count.get()), + ('_sum', {}, self._sum.get())) def _floatToGoString(d): @@ -372,9 +385,8 @@ class Histogram(object): _type = 'histogram' _reserved_labelnames = ['histogram'] - def __init__(self, buckets=(.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, _INF)): - self._sum = 0.0 - self._lock = Lock() + def __init__(self, name, labelnames, labelvalues, buckets=(.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, _INF)): + self._sum = _ValueClass(name + '_sum', labelnames, labelvalues) buckets = [float(b) for b in buckets] if buckets != sorted(buckets): # This is probably an error on the part of the user, @@ -385,16 +397,18 @@ def __init__(self, buckets=(.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2 if len(buckets) < 2: raise ValueError('Must have at least two buckets') self._upper_bounds = buckets - self._buckets = [0.0] * len(buckets) + self._buckets = [] + bucket_labelnames = labelnames + ('le',) + for b in buckets: + self._buckets.append(_ValueClass(name + '_bucket', bucket_labelnames, labelvalues + (_floatToGoString(b),))) def observe(self, amount): '''Observe the given amount.''' - with self._lock: - self._sum += amount - for i, bound in enumerate(self._upper_bounds): - if amount <= bound: - self._buckets[i] += 1 - break + self._sum.inc(amount) + for i, bound in enumerate(self._upper_bounds): + if amount <= bound: + self._buckets[i].inc(1) + break def time(self): '''Time a block of code or function, and observe the duration in seconds. @@ -423,15 +437,14 @@ def wrapped(*args, **kwargs): return Timer(self) def _samples(self): - with self._lock: - samples = [] - acc = 0 - for i, bound in enumerate(self._upper_bounds): - acc += self._buckets[i] - samples.append(('_bucket', {'le': _floatToGoString(bound)}, acc)) - samples.append(('_count', {}, acc)) - samples.append(('_sum', {}, self._sum)) - return tuple(samples) + samples = [] + acc = 0 + for i, bound in enumerate(self._upper_bounds): + acc += self._buckets[i].get() + samples.append(('_bucket', {'le': _floatToGoString(bound)}, acc)) + samples.append(('_count', {}, acc)) + samples.append(('_sum', {}, self._sum.get())) + return tuple(samples) CONTENT_TYPE_LATEST = 'text/plain; version=0.0.4; charset=utf-8' From d2d88ea1b22dcf96416877a81e0ec31ad999e96f Mon Sep 17 00:00:00 2001 From: Brian Brazil Date: Tue, 5 May 2015 23:37:43 +0100 Subject: [PATCH 8/8] Add multi-process value and collector. --- prometheus_client/__init__.py | 93 +++++++++++++++++++++++++++++------ 1 file changed, 78 insertions(+), 15 deletions(-) diff --git a/prometheus_client/__init__.py b/prometheus_client/__init__.py index 29fa38eb..b2d58b06 100644 --- a/prometheus_client/__init__.py +++ b/prometheus_client/__init__.py @@ -3,9 +3,11 @@ from __future__ import unicode_literals import copy +import glob import re import resource import os +import shelve import time import threading import types @@ -100,22 +102,63 @@ class _MutexValue(object): '''A float protected by a mutex.''' def __init__(self, name, labelnames, labelvalues): - self._value = 0.0 - self._lock = Lock() + self._value = 0.0 + self._lock = Lock() def inc(self, amount): - with self._lock: - self._value += amount + with self._lock: + self._value += amount def set(self, value): - with self._lock: - self._value = value + with self._lock: + self._value = value def get(self): - with self._lock: - return self._value + with self._lock: + return self._value + + +def _MultiProcessValue(): + pid = os.getpid() + filename = os.path.join(os.environ['prometheus_multiproc_dir'], '{0}.db'.format(pid)) + samples = shelve.open(filename) + + class _ShelveValue(object): + '''A float protected by a mutex backed by a per-process shelve.''' + def __init__(self, name, labelnames, labelvalues): + self._key = repr((name, labelnames, labelvalues)) + self._value = samples.get(self._key, 0.0) + samples[self._key] = self._value + samples.sync() + self._lock = Lock() + + def inc(self, amount): + with self._lock: + self._value += amount + samples[self._key] = self._value + samples.sync() + + def set(self, value): + with self._lock: + self._value = value + samples[self._key] = self._value + samples.sync() -_ValueClass = _MutexValue + def get(self): + with self._lock: + return self._value + + return _ShelveValue + + +# Should we enable multi-process mode? +# This needs to be chosen before the first metric is constructed, +# and as that may be in some arbitrary library the user/admin has +# no control over we use an enviroment variable. +if 'prometheus_multiproc_dir' in os.environ: + _Value = _MultiProcessValue() +else: + _Value = _MutexValue class _LabelWrapper(object): @@ -219,7 +262,7 @@ class Counter(object): _reserved_labelnames = [] def __init__(self, name, labelnames, labelvalues): - self._value = _ValueClass(name, labelnames, labelvalues) + self._value = _Value(name, labelnames, labelvalues) def inc(self, amount=1): '''Increment counter by the given amount.''' @@ -265,7 +308,7 @@ class Gauge(object): _reserved_labelnames = [] def __init__(self, name, labelnames, labelvalues): - self._value = _ValueClass(name, labelnames, labelvalues) + self._value = _Value(name, labelnames, labelvalues) def inc(self, amount=1): '''Increment gauge by the given amount.''' @@ -331,8 +374,8 @@ class Summary(object): _reserved_labelnames = ['quantile'] def __init__(self, name, labelnames, labelvalues): - self._count = _ValueClass(name + '_count', labelnames, labelvalues) - self._sum = _ValueClass(name + '_sum', labelnames, labelvalues) + self._count = _Value(name + '_count', labelnames, labelvalues) + self._sum = _Value(name + '_sum', labelnames, labelvalues) def observe(self, amount): '''Observe the given amount.''' @@ -386,7 +429,7 @@ class Histogram(object): _reserved_labelnames = ['histogram'] def __init__(self, name, labelnames, labelvalues, buckets=(.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, _INF)): - self._sum = _ValueClass(name + '_sum', labelnames, labelvalues) + self._sum = _Value(name + '_sum', labelnames, labelvalues) buckets = [float(b) for b in buckets] if buckets != sorted(buckets): # This is probably an error on the part of the user, @@ -400,7 +443,7 @@ def __init__(self, name, labelnames, labelvalues, buckets=(.005, .01, .025, .05, self._buckets = [] bucket_labelnames = labelnames + ('le',) for b in buckets: - self._buckets.append(_ValueClass(name + '_bucket', bucket_labelnames, labelvalues + (_floatToGoString(b),))) + self._buckets.append(_Value(name + '_bucket', bucket_labelnames, labelvalues + (_floatToGoString(b),))) def observe(self, amount): '''Observe the given amount.''' @@ -587,6 +630,26 @@ def collect(self): PROCESS_COLLECTOR = ProcessCollector() """Default ProcessCollector in default Registry REGISTRY.""" +class MultiProcessCollector(object): + """Collector for files for multi-process mode.""" + def __init__(self, registry, path=os.environ.get('prometheus_multiproc_dir')): + self._path = path + if registry: + registry.register(self) + + def collect(self): + samples = {} + for f in glob.glob(os.path.join(self._path, '*.db')): + for key, value in shelve.open(f).items(): + samples.setdefault(key, 0.0) + samples[key] += value + metrics = {} + for key in samples: + name, labelnames, labelvalues = eval(key) + metrics.setdefault(name, Metric(name, name, 'untyped')) + metrics[name].add_sample(name, dict(zip(labelnames, labelvalues)), samples[key]) + return metrics.values() + if __name__ == '__main__': c = Counter('cc', 'A counter')