diff --git a/README.md b/README.md index 1721208e..11d720ac 100644 --- a/README.md +++ b/README.md @@ -91,6 +91,7 @@ with c.count_exceptions(ValueError): Gauges can go up and down. + ```python from prometheus_client import Gauge g = Gauge('my_inprogress_requests', 'Description of gauge') @@ -243,5 +244,21 @@ g.set(1) write_to_textfile('/configured/textfile/path/raid.prom', registry) ``` +A separate registry is used, as the default registry may contain other metrics. + +## Exporting to a Pushgateway + +The [Pushgateway](https://github.com/prometheus/pushgateway) +allows ephemeral and batch jobs to expose their metrics to Prometheus. + +```python +from prometheus_client import CollectorRegistry,Gauge,push_to_gateway +registry = CollectorRegistry() +g = Gauge('job_finished_ok_at', 'last time a batch job successfully finished', registry=registry) +g.set_to_current_time() +push_to_gateway(registry, job='batchA', host='pushgateway.mydomain') +``` + +A separate registry is used, as the default registry may contain other metrics. A separate registry is used, as the default registry may contain other metrics such as those from the Process Collector. diff --git a/prometheus_client/__init__.py b/prometheus_client/__init__.py index 59af12d2..9b7262d1 100644 --- a/prometheus_client/__init__.py +++ b/prometheus_client/__init__.py @@ -3,12 +3,22 @@ from __future__ import unicode_literals import copy +import glob import re import resource import os +import shelve import time import threading import types + +try: + from urllib2 import urlopen, quote +except ImportError: + # Python 3 + from urllib.request import urlopen + from urllib.parse import quote + try: from BaseHTTPServer import BaseHTTPRequestHandler from BaseHTTPServer import HTTPServer @@ -20,6 +30,7 @@ from functools import wraps from threading import Lock + __all__ = ['Counter', 'Gauge', 'Summary', 'Histogram'] # http://stackoverflow.com/questions/19913653/no-unicode-in-all-for-a-packages-init __all__ = [n.encode('ascii') for n in __all__] @@ -96,11 +107,75 @@ def add_sample(self, name, labels, value): self._samples.append((name, labels, value)) +class _MutexValue(object): + '''A float protected by a mutex.''' + + def __init__(self, name, labelnames, labelvalues): + self._value = 0.0 + self._lock = Lock() + + def inc(self, amount): + with self._lock: + self._value += amount + + def set(self, value): + with self._lock: + self._value = value + + def get(self): + with self._lock: + return self._value + + +def _MultiProcessValue(): + pid = os.getpid() + filename = os.path.join(os.environ['prometheus_multiproc_dir'], '{0}.db'.format(pid)) + samples = shelve.open(filename) + + class _ShelveValue(object): + '''A float protected by a mutex backed by a per-process shelve.''' + def __init__(self, name, labelnames, labelvalues): + self._key = repr((name, labelnames, labelvalues)) + self._value = samples.get(self._key, 0.0) + samples[self._key] = self._value + samples.sync() + self._lock = Lock() + + def inc(self, amount): + with self._lock: + self._value += amount + samples[self._key] = self._value + samples.sync() + + def set(self, value): + with self._lock: + self._value = value + samples[self._key] = self._value + samples.sync() + + def get(self): + with self._lock: + return self._value + + return _ShelveValue + + +# Should we enable multi-process mode? +# This needs to be chosen before the first metric is constructed, +# and as that may be in some arbitrary library the user/admin has +# no control over we use an enviroment variable. +if 'prometheus_multiproc_dir' in os.environ: + _Value = _MultiProcessValue() +else: + _Value = _MutexValue + + class _LabelWrapper(object): '''Handles labels for the wrapped metric.''' - def __init__(self, wrappedClass, labelnames, **kwargs): + def __init__(self, wrappedClass, name, labelnames, **kwargs): self._wrappedClass = wrappedClass self._type = wrappedClass._type + self._name = name self._labelnames = labelnames self._kwargs = kwargs self._lock = Lock() @@ -130,7 +205,7 @@ def labels(self, *labelvalues): labelvalues = tuple([unicode(l) for l in labelvalues]) with self._lock: if labelvalues not in self._metrics: - self._metrics[labelvalues] = self._wrappedClass(**self._kwargs) + self._metrics[labelvalues] = self._wrappedClass(self._name, self._labelnames, labelvalues, **self._kwargs) return self._metrics[labelvalues] def remove(self, *labelvalues): @@ -153,7 +228,15 @@ def _samples(self): def _MetricWrapper(cls): '''Provides common functionality for metrics.''' def init(name, documentation, labelnames=(), namespace='', subsystem='', registry=REGISTRY, **kwargs): + full_name = '' + if namespace: + full_name += namespace + '_' + if subsystem: + full_name += subsystem + '_' + full_name += name + if labelnames: + labelnames = tuple(labelnames) for l in labelnames: if not _METRIC_LABEL_NAME_RE.match(l): raise ValueError('Invalid label metric name: ' + l) @@ -161,16 +244,9 @@ def init(name, documentation, labelnames=(), namespace='', subsystem='', registr raise ValueError('Reserved label metric name: ' + l) if l in cls._reserved_labelnames: raise ValueError('Reserved label metric name: ' + l) - collector = _LabelWrapper(cls, labelnames, **kwargs) + collector = _LabelWrapper(cls, name, labelnames, **kwargs) else: - collector = cls(**kwargs) - - full_name = '' - if namespace: - full_name += namespace + '_' - if subsystem: - full_name += subsystem + '_' - full_name += name + collector = cls(name, labelnames, (), **kwargs) if not _METRIC_NAME_RE.match(full_name): raise ValueError('Invalid metric name: ' + full_name) @@ -194,16 +270,14 @@ class Counter(object): _type = 'counter' _reserved_labelnames = [] - def __init__(self): - self._value = 0.0 - self._lock = Lock() + def __init__(self, name, labelnames, labelvalues): + self._value = _Value(name, labelnames, labelvalues) def inc(self, amount=1): '''Increment counter by the given amount.''' if amount < 0: raise ValueError('Counters can only be incremented by non-negative amounts.') - with self._lock: - self._value += amount + self._value.inc(amount) def count_exceptions(self, exception=Exception): '''Count exceptions in a block of code or function. @@ -234,8 +308,7 @@ def wrapped(*args, **kwargs): return ExceptionCounter(self) def _samples(self): - with self._lock: - return (('', {}, self._value), ) + return (('', {}, self._value.get()), ) @_MetricWrapper @@ -243,24 +316,20 @@ class Gauge(object): _type = 'gauge' _reserved_labelnames = [] - def __init__(self): - self._value = 0.0 - self._lock = Lock() + def __init__(self, name, labelnames, labelvalues): + self._value = _Value(name, labelnames, labelvalues) def inc(self, amount=1): '''Increment gauge by the given amount.''' - with self._lock: - self._value += amount + self._value.inc(amount) def dec(self, amount=1): '''Decrement gauge by the given amount.''' - with self._lock: - self._value -= amount + self._value.inc(-amount) def set(self, value): '''Set gauge to the given value.''' - with self._lock: - self._value = float(value) + self._value.set(float(value)) def set_to_current_time(self): '''Set gauge to the current unixtime.''' @@ -305,8 +374,7 @@ def samples(self): self._samples = types.MethodType(samples, self) def _samples(self): - with self._lock: - return (('', {}, self._value), ) + return (('', {}, self._value.get()), ) @_MetricWrapper @@ -314,16 +382,14 @@ class Summary(object): _type = 'summary' _reserved_labelnames = ['quantile'] - def __init__(self): - self._count = 0.0 - self._sum = 0.0 - self._lock = Lock() + def __init__(self, name, labelnames, labelvalues): + self._count = _Value(name + '_count', labelnames, labelvalues) + self._sum = _Value(name + '_sum', labelnames, labelvalues) def observe(self, amount): '''Observe the given amount.''' - with self._lock: - self._count += 1 - self._sum += amount + self._count.inc(1) + self._sum.inc(amount) def time(self): '''Time a block of code or function, and observe the duration in seconds. @@ -352,10 +418,9 @@ def wrapped(*args, **kwargs): return Timer(self) def _samples(self): - with self._lock: - return ( - ('_count', {}, self._count), - ('_sum', {}, self._sum)) + return ( + ('_count', {}, self._count.get()), + ('_sum', {}, self._sum.get())) def _floatToGoString(d): @@ -372,9 +437,8 @@ class Histogram(object): _type = 'histogram' _reserved_labelnames = ['histogram'] - def __init__(self, buckets=(.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, _INF)): - self._sum = 0.0 - self._lock = Lock() + def __init__(self, name, labelnames, labelvalues, buckets=(.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, _INF)): + self._sum = _Value(name + '_sum', labelnames, labelvalues) buckets = [float(b) for b in buckets] if buckets != sorted(buckets): # This is probably an error on the part of the user, @@ -385,16 +449,18 @@ def __init__(self, buckets=(.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2 if len(buckets) < 2: raise ValueError('Must have at least two buckets') self._upper_bounds = buckets - self._buckets = [0.0] * len(buckets) + self._buckets = [] + bucket_labelnames = labelnames + ('le',) + for b in buckets: + self._buckets.append(_Value(name + '_bucket', bucket_labelnames, labelvalues + (_floatToGoString(b),))) def observe(self, amount): '''Observe the given amount.''' - with self._lock: - self._sum += amount - for i, bound in enumerate(self._upper_bounds): - if amount <= bound: - self._buckets[i] += 1 - break + self._sum.inc(amount) + for i, bound in enumerate(self._upper_bounds): + if amount <= bound: + self._buckets[i].inc(1) + break def time(self): '''Time a block of code or function, and observe the duration in seconds. @@ -423,15 +489,14 @@ def wrapped(*args, **kwargs): return Timer(self) def _samples(self): - with self._lock: - samples = [] - acc = 0 - for i, bound in enumerate(self._upper_bounds): - acc += self._buckets[i] - samples.append(('_bucket', {'le': _floatToGoString(bound)}, acc)) - samples.append(('_count', {}, acc)) - samples.append(('_sum', {}, self._sum)) - return tuple(samples) + samples = [] + acc = 0 + for i, bound in enumerate(self._upper_bounds): + acc += self._buckets[i].get() + samples.append(('_bucket', {'le': _floatToGoString(bound)}, acc)) + samples.append(('_count', {}, acc)) + samples.append(('_sum', {}, self._sum.get())) + return tuple(samples) CONTENT_TYPE_LATEST = 'text/plain; version=0.0.4; charset=utf-8' @@ -467,6 +532,36 @@ def do_GET(self): def log_message(self, format, *args): return +def build_pushgateway_url(job, instance=None, host='localhost', port=9091): + ''' + Build a valid pushgateway url + ''' + + if instance: + instancestr = '/instances/{}'.format(instance) + else: + instancestr = '' + + url = 'http://{}:{}/metrics/jobs/{}{}'.format(host, port, + quote(job), + quote(instancestr)) + return url + + +def push_to_gateway_url(url, registry, timeout=None): + '''Push metrics to the given url''' + + resp = urlopen(url, data=generate_latest(registry), timeout=timeout) + if resp.code >= 400: + raise IOError("error pushing to pushgateway: {0} {1}".format( + resp.code, resp.msg)) + + +def push_to_gateway(registry, job, instance=None, host='localhost', port=9091, timeout=None): + '''Push metrics to a pushgateway''' + + url = build_pushgateway_url(job, instance, host, port) + push_to_gateway_url(url, registry, timeout) def start_http_server(port, addr=''): """Starts a HTTP server for prometheus metrics as a daemon thread.""" @@ -574,6 +669,26 @@ def collect(self): PROCESS_COLLECTOR = ProcessCollector() """Default ProcessCollector in default Registry REGISTRY.""" +class MultiProcessCollector(object): + """Collector for files for multi-process mode.""" + def __init__(self, registry, path=os.environ.get('prometheus_multiproc_dir')): + self._path = path + if registry: + registry.register(self) + + def collect(self): + samples = {} + for f in glob.glob(os.path.join(self._path, '*.db')): + for key, value in shelve.open(f).items(): + samples.setdefault(key, 0.0) + samples[key] += value + metrics = {} + for key in samples: + name, labelnames, labelvalues = eval(key) + metrics.setdefault(name, Metric(name, name, 'untyped')) + metrics[name].add_sample(name, dict(zip(labelnames, labelvalues)), samples[key]) + return metrics.values() + if __name__ == '__main__': c = Counter('cc', 'A counter') diff --git a/tests/test_client.py b/tests/test_client.py index d28ed7a6..0f06ae03 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -5,6 +5,7 @@ from prometheus_client import Gauge, Counter, Summary, Histogram from prometheus_client import CollectorRegistry, generate_latest, ProcessCollector +from prometheus_client import build_pushgateway_url class TestCounter(unittest.TestCase): @@ -361,6 +362,24 @@ def test_working_fake_pid(self): self.assertEqual(None, self.registry.get_sample_value('process_open_fds')) self.assertEqual(None, self.registry.get_sample_value('process_fake_namespace')) +class TestBuildPushgatewayUrl(unittest.TestCase): + def test_job_instance(self): + expected = 'http://localhost:9091/metrics/jobs/foojob/instances/fooinstance' + + url = build_pushgateway_url('foojob', 'fooinstance') + self.assertEqual(url, expected) + + def test_host_port(self): + expected = 'http://foohost:9092/metrics/jobs/foojob' + + url = build_pushgateway_url('foojob', host='foohost', port=9092) + self.assertEqual(url, expected) + + def test_url_escaping(self): + expected = 'http://localhost:9091/metrics/jobs/foo%20job' + + url = build_pushgateway_url('foo job') + self.assertEqual(url, expected)