diff --git a/README.md b/README.md index a838d309..c834f4fa 100644 --- a/README.md +++ b/README.md @@ -1,542 +1,38 @@ # Prometheus Python Client -The official Python 2 and 3 client for [Prometheus](http://prometheus.io). +This prometheus client has ability to store all metrics in django cache. +Just set environment variable "prometheus_django_cache" to django cache name, like "default". -## Three Step Demo +Gauge has new mode "last" - only last value is collected. -**One**: Install the client: -``` -pip install prometheus_client -``` +## Internals +Each process has separate metrics storage in one cache entry. +Each process is identified by hostname+pid. -**Two**: Paste the following into a Python interpreter: -```python -from prometheus_client import start_http_server, Summary -import random -import time +To allow collector to find all instances that stored any metrics there is global list of all available hostname+pid identifiers stored in cache. Read+Write access to that list is synchronized by distributed lock based on cache. +Process added to that list just once, when first metric value is stored. -# Create a metric to track time spent and requests made. -REQUEST_TIME = Summary('request_processing_seconds', 'Time spent processing request') +Gauge modes work like with "prometheus_multiproc_dir", but mode "all" is working like "liveall". -# Decorate function with metric. -@REQUEST_TIME.time() -def process_request(t): - """A dummy function that takes some time.""" - time.sleep(t) +distributed.mark_process_dead works like multiprocess.mark_process_dead but removes all gauge modes including mode "all". -if __name__ == '__main__': - # Start up the server to expose the metrics. - start_http_server(8000) - # Generate some requests. - while True: - process_request(random.random()) -``` +Metric storing process has no locking mechanism for read and write because metrics storage is per process with assumption that there is no theading. -**Three**: Visit [http://localhost:8000/](http://localhost:8000/) to view the metrics. +Adapted multiprocess tests is working. -From one easy to use decorator you get: - * `request_processing_seconds_count`: Number of times this function was called. - * `request_processing_seconds_sum`: Total amount of time spent in this function. +## Advantages +* any distributed application will work out of the box +* no FD leaking like in multiprocessing mode with uwsgi +* no files stored on disk +* theoretically any django cache backend will work (but I have some troubles with memcached on practice) +* because of django cache api simplicity, any storage can be simple plugged in -Prometheus's `rate` function allows calculation of both requests per second, -and latency over time from this data. +## Disadvantages and things to do +* metrics stored in cache have ttl (5 days by now) and old counters can be evicted so counter value will decrease (ttl refreshing is planned, but django 1.7 cache API is not supported touch method). +* maybe performance can suffer because of network overhead (no performance tests are done by now) +* hostname+pid list has no cleaning process and can grow up +* just testing it in production, it's early alpha version +* i plan to test it only with redis backend -In addition if you're on Linux the `process` metrics expose CPU, memory and -other information about the process for free! - -## Installation - -``` -pip install prometheus_client -``` - -This package can be found on -[PyPI](https://pypi.python.org/pypi/prometheus_client). - -## Instrumenting - -Four types of metric are offered: Counter, Gauge, Summary and Histogram. -See the documentation on [metric types](http://prometheus.io/docs/concepts/metric_types/) -and [instrumentation best practices](http://prometheus.io/docs/practices/instrumentation/#counter-vs.-gauge,-summary-vs.-histogram) -on how to use them. - -### Counter - -Counters go up, and reset when the process restarts. - - -```python -from prometheus_client import Counter -c = Counter('my_failures', 'Description of counter') -c.inc() # Increment by 1 -c.inc(1.6) # Increment by given value -``` - -If there is a suffix of `_total` on the metric name, it will be removed. When -exposing the time series for counter, a `_total` suffix will be added. This is -for compatibility between OpenMetrics and the Prometheus text format, as OpenMetrics -requires the `_total` suffix. - -There are utilities to count exceptions raised: - -```python -@c.count_exceptions() -def f(): - pass - -with c.count_exceptions(): - pass - -# Count only one type of exception -with c.count_exceptions(ValueError): - pass -``` - -### Gauge - -Gauges can go up and down. - -```python -from prometheus_client import Gauge -g = Gauge('my_inprogress_requests', 'Description of gauge') -g.inc() # Increment by 1 -g.dec(10) # Decrement by given value -g.set(4.2) # Set to a given value -``` - -There are utilities for common use cases: - -```python -g.set_to_current_time() # Set to current unixtime - -# Increment when entered, decrement when exited. -@g.track_inprogress() -def f(): - pass - -with g.track_inprogress(): - pass -``` - -A Gauge can also take its value from a callback: - -```python -d = Gauge('data_objects', 'Number of objects') -my_dict = {} -d.set_function(lambda: len(my_dict)) -``` - -### Summary - -Summaries track the size and number of events. - -```python -from prometheus_client import Summary -s = Summary('request_latency_seconds', 'Description of summary') -s.observe(4.7) # Observe 4.7 (seconds in this case) -``` - -There are utilities for timing code: - -```python -@s.time() -def f(): - pass - -with s.time(): - pass -``` - -The Python client doesn't store or expose quantile information at this time. - -### Histogram - -Histograms track the size and number of events in buckets. -This allows for aggregatable calculation of quantiles. - -```python -from prometheus_client import Histogram -h = Histogram('request_latency_seconds', 'Description of histogram') -h.observe(4.7) # Observe 4.7 (seconds in this case) -``` - -The default buckets are intended to cover a typical web/rpc request from milliseconds to seconds. -They can be overridden by passing `buckets` keyword argument to `Histogram`. - -There are utilities for timing code: - -```python -@h.time() -def f(): - pass - -with h.time(): - pass -``` - -### Info - -Info tracks key-value information, usually about a whole target. - -```python -from prometheus_client import Info -i = Info('my_build_version', 'Description of info') -i.info({'version': '1.2.3', 'buildhost': 'foo@bar'}) -``` - -### Enum - -Enum tracks which of a set of states something is currently in. - -```python -from prometheus_client import Enum -e = Enum('my_task_state', 'Description of enum', - states=['starting', 'running', 'stopped']) -e.state('running') -``` - -### Labels - -All metrics can have labels, allowing grouping of related time series. - -See the best practices on [naming](http://prometheus.io/docs/practices/naming/) -and [labels](http://prometheus.io/docs/practices/instrumentation/#use-labels). - -Taking a counter as an example: - -```python -from prometheus_client import Counter -c = Counter('my_requests_total', 'HTTP Failures', ['method', 'endpoint']) -c.labels('get', '/').inc() -c.labels('post', '/submit').inc() -``` - -Labels can also be passed as keyword-arguments: - -```python -from prometheus_client import Counter -c = Counter('my_requests_total', 'HTTP Failures', ['method', 'endpoint']) -c.labels(method='get', endpoint='/').inc() -c.labels(method='post', endpoint='/submit').inc() -``` - -### Process Collector - -The Python client automatically exports metrics about process CPU usage, RAM, -file descriptors and start time. These all have the prefix `process`, and -are only currently available on Linux. - -The namespace and pid constructor arguments allows for exporting metrics about -other processes, for example: -``` -ProcessCollector(namespace='mydaemon', pid=lambda: open('/var/run/daemon.pid').read()) -``` - -### Platform Collector - -The client also automatically exports some metadata about Python. If using Jython, -metadata about the JVM in use is also included. This information is available as -labels on the `python_info` metric. The value of the metric is 1, since it is the -labels that carry information. - -## Exporting - -There are several options for exporting metrics. - -### HTTP - -Metrics are usually exposed over HTTP, to be read by the Prometheus server. - -The easiest way to do this is via `start_http_server`, which will start a HTTP -server in a daemon thread on the given port: - -```python -from prometheus_client import start_http_server - -start_http_server(8000) -``` - -Visit [http://localhost:8000/](http://localhost:8000/) to view the metrics. - -To add Prometheus exposition to an existing HTTP server, see the `MetricsHandler` class -which provides a `BaseHTTPRequestHandler`. It also serves as a simple example of how -to write a custom endpoint. - -#### Twisted - -To use prometheus with [twisted](https://twistedmatrix.com/), there is `MetricsResource` which exposes metrics as a twisted resource. - -```python -from prometheus_client.twisted import MetricsResource -from twisted.web.server import Site -from twisted.web.resource import Resource -from twisted.internet import reactor - -root = Resource() -root.putChild(b'metrics', MetricsResource()) - -factory = Site(root) -reactor.listenTCP(8000, factory) -reactor.run() -``` - -#### WSGI - -To use Prometheus with [WSGI](http://wsgi.readthedocs.org/en/latest/), there is -`make_wsgi_app` which creates a WSGI application. - -```python -from prometheus_client import make_wsgi_app -from wsgiref.simple_server import make_server - -app = make_wsgi_app() -httpd = make_server('', 8000, app) -httpd.serve_forever() -``` - -Such an application can be useful when integrating Prometheus metrics with WSGI -apps. - -The method `start_wsgi_server` can be used to serve the metrics through the -WSGI reference implementation in a new thread. - -```python -from prometheus_client import start_wsgi_server - -start_wsgi_server(8000) -``` - -#### Flask - -To use Prometheus with [Flask](http://flask.pocoo.org/) we need to serve metrics through a Prometheus WSGI application. This can be achieved using [Flask's application dispatching](http://flask.pocoo.org/docs/latest/patterns/appdispatch/). Below is a working example. - -Save the snippet below in a `myapp.py` file - -```python -from flask import Flask -from werkzeug.wsgi import DispatcherMiddleware -from prometheus_client import make_wsgi_app - -# Create my app -app = Flask(__name__) - -# Add prometheus wsgi middleware to route /metrics requests -app_dispatch = DispatcherMiddleware(app, { - '/metrics': make_wsgi_app() -}) -``` - -Run the example web application like this - -```bash -# Install uwsgi if you do not have it -pip install uwsgi -uwsgi --http 127.0.0.1:8000 --wsgi-file myapp.py --callable app_dispatch -``` - -Visit http://localhost:8000/metrics to see the metrics - -### Node exporter textfile collector - -The [textfile collector](https://github.com/prometheus/node_exporter#textfile-collector) -allows machine-level statistics to be exported out via the Node exporter. - -This is useful for monitoring cronjobs, or for writing cronjobs to expose metrics -about a machine system that the Node exporter does not support or would not make sense -to perform at every scrape (for example, anything involving subprocesses). - -```python -from prometheus_client import CollectorRegistry, Gauge, write_to_textfile - -registry = CollectorRegistry() -g = Gauge('raid_status', '1 if raid array is okay', registry=registry) -g.set(1) -write_to_textfile('/configured/textfile/path/raid.prom', registry) -``` - -A separate registry is used, as the default registry may contain other metrics -such as those from the Process Collector. - -## Exporting to a Pushgateway - -The [Pushgateway](https://github.com/prometheus/pushgateway) -allows ephemeral and batch jobs to expose their metrics to Prometheus. - -```python -from prometheus_client import CollectorRegistry, Gauge, push_to_gateway - -registry = CollectorRegistry() -g = Gauge('job_last_success_unixtime', 'Last time a batch job successfully finished', registry=registry) -g.set_to_current_time() -push_to_gateway('localhost:9091', job='batchA', registry=registry) -``` - -A separate registry is used, as the default registry may contain other metrics -such as those from the Process Collector. - -Pushgateway functions take a grouping key. `push_to_gateway` replaces metrics -with the same grouping key, `pushadd_to_gateway` only replaces metrics with the -same name and grouping key and `delete_from_gateway` deletes metrics with the -given job and grouping key. See the -[Pushgateway documentation](https://github.com/prometheus/pushgateway/blob/master/README.md) -for more information. - -`instance_ip_grouping_key` returns a grouping key with the instance label set -to the host's IP address. - -### Handlers for authentication - -If the push gateway you are connecting to is protected with HTTP Basic Auth, -you can use a special handler to set the Authorization header. - -```python -from prometheus_client import CollectorRegistry, Gauge, push_to_gateway -from prometheus_client.exposition import basic_auth_handler - -def my_auth_handler(url, method, timeout, headers, data): - username = 'foobar' - password = 'secret123' - return basic_auth_handler(url, method, timeout, headers, data, username, password) -registry = CollectorRegistry() -g = Gauge('job_last_success_unixtime', 'Last time a batch job successfully finished', registry=registry) -g.set_to_current_time() -push_to_gateway('localhost:9091', job='batchA', registry=registry, handler=my_auth_handler) -``` - -## Bridges - -It is also possible to expose metrics to systems other than Prometheus. -This allows you to take advantage of Prometheus instrumentation even -if you are not quite ready to fully transition to Prometheus yet. - -### Graphite - -Metrics are pushed over TCP in the Graphite plaintext format. - -```python -from prometheus_client.bridge.graphite import GraphiteBridge - -gb = GraphiteBridge(('graphite.your.org', 2003)) -# Push once. -gb.push() -# Push every 10 seconds in a daemon thread. -gb.start(10.0) -``` - -## Custom Collectors - -Sometimes it is not possible to directly instrument code, as it is not -in your control. This requires you to proxy metrics from other systems. - -To do so you need to create a custom collector, for example: - -```python -from prometheus_client.core import GaugeMetricFamily, CounterMetricFamily, REGISTRY - -class CustomCollector(object): - def collect(self): - yield GaugeMetricFamily('my_gauge', 'Help text', value=7) - c = CounterMetricFamily('my_counter_total', 'Help text', labels=['foo']) - c.add_metric(['bar'], 1.7) - c.add_metric(['baz'], 3.8) - yield c - -REGISTRY.register(CustomCollector()) -``` - -`SummaryMetricFamily` and `HistogramMetricFamily` work similarly. - -A collector may implement a `describe` method which returns metrics in the same -format as `collect` (though you don't have to include the samples). This is -used to predetermine the names of time series a `CollectorRegistry` exposes and -thus to detect collisions and duplicate registrations. - -Usually custom collectors do not have to implement `describe`. If `describe` is -not implemented and the CollectorRegistry was created with `auto_desribe=True` -(which is the case for the default registry) then `collect` will be called at -registration time instead of `describe`. If this could cause problems, either -implement a proper `describe`, or if that's not practical have `describe` -return an empty list. - - -## Multiprocess Mode (Gunicorn) - -Prometheus client libaries presume a threaded model, where metrics are shared -across workers. This doesn't work so well for languages such as Python where -it's common to have processes rather than threads to handle large workloads. - -To handle this the client library can be put in multiprocess mode. -This comes with a number of limitations: - -- Registries can not be used as normal, all instantiated metrics are exported -- Custom collectors do not work (e.g. cpu and memory metrics) -- Info and Enum metrics do not work -- The pushgateway cannot be used -- Gauges cannot use the `pid` label - -There's several steps to getting this working: - -**One**: Gunicorn deployment - -The `prometheus_multiproc_dir` environment variable must be set to a directory -that the client library can use for metrics. This directory must be wiped -between Gunicorn runs (before startup is recommended). - -Put the following in the config file: -```python -from prometheus_client import multiprocess - -def child_exit(server, worker): - multiprocess.mark_process_dead(worker.pid) -``` - -**Two**: Inside the application -```python -from prometheus_client import multiprocess -from prometheus_client import generate_latest, CollectorRegistry, CONTENT_TYPE_LATEST, Gauge - -# Example gauge. -IN_PROGRESS = Gauge("inprogress_requests", "help", multiprocess_mode='livesum') - - -# Expose metrics. -@IN_PROGRESS.track_inprogress() -def app(environ, start_response): - registry = CollectorRegistry() - multiprocess.MultiProcessCollector(registry) - data = generate_latest(registry) - status = '200 OK' - response_headers = [ - ('Content-type', CONTENT_TYPE_LATEST), - ('Content-Length', str(len(data))) - ] - start_response(status, response_headers) - return iter([data]) -``` - -**Three**: Instrumentation - -Counters, Summarys and Histograms work as normal. - -Gauges have several modes they can run in, which can be selected with the -`multiprocess_mode` parameter. - -- 'all': Default. Return a timeseries per process alive or dead. -- 'liveall': Return a timeseries per process that is still alive. -- 'livesum': Return a single timeseries that is the sum of the values of alive processes. -- 'max': Return a single timeseries that is the maximum of the values of all processes, alive or dead. -- 'min': Return a single timeseries that is the minimum of the values of all processes, alive or dead. - -## Parser - -The Python client supports parsing the Prometheus text format. -This is intended for advanced use cases where you have servers -exposing Prometheus metrics and need to get them into some other -system. - -```python -from prometheus_client.parser import text_string_to_metric_families -for family in text_string_to_metric_families(u"my_gauge 1.0\n"): - for sample in family.samples: - print("Name: {0} Labels: {1} Value: {2}".format(*sample)) -``` +## Requirements +* Django 1.7+ supported \ No newline at end of file diff --git a/prometheus_client/core.py b/prometheus_client/core.py index 964947a8..217ac787 100644 --- a/prometheus_client/core.py +++ b/prometheus_client/core.py @@ -2,7 +2,6 @@ from __future__ import unicode_literals -from collections import namedtuple import copy import json import math @@ -11,10 +10,11 @@ import re import struct import sys -from threading import Lock import time -from timeit import default_timer import types +from collections import namedtuple +from threading import Lock +from timeit import default_timer from .decorator import decorate @@ -653,7 +653,6 @@ def _mmap_key(metric_name, name, labelnames, labelvalues): labels = dict(zip(labelnames, labelvalues)) return json.dumps([metric_name, name, labels], sort_keys=True) - def _MultiProcessValue(_pidFunc=os.getpid): files = {} values = [] @@ -727,6 +726,10 @@ def get(self): # no control over we use an environment variable. if 'prometheus_multiproc_dir' in os.environ: _ValueClass = _MultiProcessValue() +elif 'prometheus_django_cache' in os.environ: + from prometheus_client.distributed import DistributedValue + + _ValueClass = DistributedValue else: _ValueClass = _MutexValue @@ -968,11 +971,12 @@ def f(): ''' _type = 'gauge' _reserved_labelnames = [] - _MULTIPROC_MODES = frozenset(('min', 'max', 'livesum', 'liveall', 'all')) + _MULTIPROC_MODES = frozenset(('min', 'max', 'livesum', 'liveall', 'all', 'last')) def __init__(self, name, labelnames, labelvalues, multiprocess_mode='all'): if (_ValueClass._multiprocess and - multiprocess_mode not in self._MULTIPROC_MODES): + (multiprocess_mode not in self._MULTIPROC_MODES or multiprocess_mode == 'last')): + raise ValueError('Invalid multiprocess mode: ' + multiprocess_mode) self._value = _ValueClass( self._type, name, name, labelnames, labelvalues, @@ -1089,7 +1093,7 @@ def _floatToGoString(d): return '+Inf' elif d == _MINUS_INF: return '-Inf' - elif math.isnan(d): + elif d is None or math.isnan(d): return 'NaN' else: return repr(float(d)) @@ -1177,7 +1181,9 @@ def _samples(self): samples = [] acc = 0 for i, bound in enumerate(self._upper_bounds): - acc += self._buckets[i].get() + v = self._buckets[i].get() + if v is not None: + acc += v samples.append(('_bucket', {'le': _floatToGoString(bound)}, acc)) samples.append(('_count', {}, acc)) samples.append(('_sum', {}, self._sum.get())) diff --git a/prometheus_client/distributed.py b/prometheus_client/distributed.py new file mode 100644 index 00000000..de53170c --- /dev/null +++ b/prometheus_client/distributed.py @@ -0,0 +1,330 @@ +#!/usr/bin/python + +from __future__ import unicode_literals + +from random import randint + +import gc +import inspect +import json +import os +import socket +import time +from collections import defaultdict +from contextlib import contextmanager +from threading import Lock + +from django.core.cache import caches + +cache = caches[os.environ.get('prometheus_django_cache', 'default')] + +hostname = socket.gethostname() + +in_lock = None + +lock = Lock() + + +class FakeSuccessContextManager(object): + def __nonzero__(self): + return True + + def __enter__(self): + return self + + def __exit__(self, *args): + pass + + +class CacheLock(object): + def __init__(self, lock_id, ttl): + self.id = 'cachelock-{0}'.format(lock_id) + self.ttl = ttl + self.status = False + self.timeout_at = None + + def __nonzero__(self): + return self.status + + def __enter__(self): + trys = 40 + while trys: + self.timeout_at = time.monotonic() + self.ttl + self.status = cache.add(self.id, 'locked', self.ttl) + if self.status: + in_lock = self.id + return self.status + time.sleep(randint(1,10)/10) + trys -= 1 + raise Exception('Could not lock for {self.id}'.format(**locals())) + + def __exit__(self, type, value, tb): + global in_lock + in_lock = None + if self.status: + if time.monotonic() < self.timeout_at: + cache.delete(self.id) + if self.id in cache: + raise Exception('Id in cache ' + self.id) + self.status = False + + +distributed_list_cache_key = 'pc_distributed_list' +distributed_list_lock = CacheLock(distributed_list_cache_key, ttl=20) +added_to_distributed_list = set() + +distributed_list_ttl_minutes = 60 * 24 * 5 +distributed_value_ttl_minutes = 60 * 24 * 5 + + + +@contextmanager +def gc_disabled(): + was_enabled_previously = gc.isenabled() + gc.disable() + yield + if was_enabled_previously: + gc.enable() + + +def add_to_distributed_list(pid): + if not (hostname, pid) in added_to_distributed_list: + with gc_disabled(): + with distributed_list_lock: + l = cache.get(distributed_list_cache_key, set()) + l.add((hostname, pid)) + cache.set(distributed_list_cache_key, l, distributed_list_ttl_minutes * 60) + added_to_distributed_list.add((hostname, pid)) + + +def remove_from_distributed_list(pid): + with gc_disabled(): + with distributed_list_lock: + l = cache.get(distributed_list_cache_key, set()) + + if (hostname, pid) in l: + l.remove((hostname, pid)) + if (hostname, pid) in added_to_distributed_list: + added_to_distributed_list.remove((hostname, pid)) + + cache.set(distributed_list_cache_key, l, distributed_list_ttl_minutes * 60) + + +_pidFunc = os.getpid + +__cached_get_all_typ = None + + +def get_all_typ(): + from . import core + + global __cached_get_all_typ + if __cached_get_all_typ is None: + __cached_get_all_typ = [] + for name, obj in inspect.getmembers(core): + if inspect.isclass(obj): + typ = getattr(obj, '_type', None) + if typ: + __cached_get_all_typ.append(typ) + else: + wrapper = getattr(obj, '__wrapped__', None) + if wrapper: + typ = getattr(wrapper, '_type', None) + if typ: + __cached_get_all_typ.append(typ) + __cached_get_all_typ.extend([core.Gauge.__wrapped__._type + '_' + mode for mode in core.Gauge.__wrapped__._MULTIPROC_MODES]) + return __cached_get_all_typ + + +prometheus_cache_key_prefix = 'pcachekey.' + + +def get_cache_key(typ, hostname, pid, prometheus_cache_key_prefix=prometheus_cache_key_prefix): + return '{prometheus_cache_key_prefix}{typ};{hostname};{pid}'.format(**locals()) + + +def deconstruct_cache_key(cache_key): + l = cache_key.replace(prometheus_cache_key_prefix, '').split(';') + return l + + +class DistributedValue(object): + _multiprocess = False + + def __init__(self, typ, metric_name, name, labelnames, labelvalues, multiprocess_mode='', **kwargs): + if typ == 'gauge': + # if multiprocess_mode == 'all': + # raise Exception('multiprocess_mode=all not supported in distributed storage') + typ_prefix = typ + '_' + multiprocess_mode + else: + typ_prefix = typ + self.typ_prefix = typ_prefix + from . import core + + self.valuekey = core._mmap_key(metric_name, name, labelnames, labelvalues) + self.__reset() + + @property + def cachekey(self): + pid = _pidFunc() + add_to_distributed_list(pid) + return get_cache_key(self.typ_prefix, hostname, pid) + + def __get_dict(self): + return cache.get(self.cachekey, {}) + + def __set_dict(self, dict_value): + cache.set(self.cachekey, dict_value, distributed_value_ttl_minutes*60) + + def __reset(self): + ts = int(time.time()) + d = self.__get_dict() + if not self.valuekey in d: + d[self.valuekey] = (0, ts) + self.__set_dict(d) + + def inc(self, amount): + ts = int(time.time()) + d = self.__get_dict() + v = d.get(self.valuekey, 0) + if isinstance(v, tuple): + v = v[0] + d[self.valuekey] = (v + amount, ts) + self.__set_dict(d) + + def set(self, value): + ts = int(time.time()) + d = self.__get_dict() + d[self.valuekey] = (value, ts) + self.__set_dict(d) + + def get(self): + # with lock: + v = self.__get_dict().get(self.valuekey, None) + if isinstance(v, tuple): + v = v[0] + return v + + +class DistributedCollector(object): + def __init__(self, registry): + if registry: + registry.register(self) + self.accumulate = True + + def collect(self): + cache_keys = set() + distributed_list = cache.get(distributed_list_cache_key, set()) + for hostname, pid in distributed_list: + for typ in get_all_typ(): + cache_keys.add(get_cache_key(typ, hostname, pid)) + + return self.merge(cache.get_many(cache_keys), accumulate=self.accumulate) + + def merge(self, cache_values, accumulate=True): + """Merge metrics from given mmap files. + + By default, histograms are accumulated, as per prometheus wire format. + But if writing the merged data back to mmap files, use + accumulate=False to avoid compound accumulation. + """ + from . import core + metrics = {} + for cache_key, cache_value in cache_values.items(): + typ, value_hostname, pid = deconstruct_cache_key(cache_key) + multiprocess_mode = None + if '_' in typ: + typ, multiprocess_mode = typ.split('_') + for key, value in cache_value.items(): + if isinstance(value, tuple): + value, value_ts = value + else: + value_ts = None + metric_name, name, labels = json.loads(key) + labels_key = tuple(sorted(labels.items())) + + metric = metrics.get(metric_name) + if metric is None: + metric = core.Metric(metric_name, 'Distributed metric', typ) + metrics[metric_name] = metric + + if typ == 'gauge': + metric._multiprocess_mode = multiprocess_mode + metric.add_sample(name, labels_key + (('pid', pid), + ('hostname', value_hostname) + ), value, timestamp=value_ts) + else: + # The duplicates and labels are fixed in the next for. + metric.add_sample(name, labels_key, value) + + for metric in metrics.values(): + samples = defaultdict(float) + buckets = {} + samples_ts = {} + + for s in metric.samples: + name, labels, value, value_ts = s.name, s.labels, s.value, s.timestamp + if metric.type == 'gauge': + without_pid = tuple(l for l in labels if l[0] != 'pid') + if metric._multiprocess_mode == 'min': + current = samples.setdefault((name, without_pid), value) + if value < current: + samples[(s.name, without_pid)] = value + elif metric._multiprocess_mode == 'max': + current = samples.setdefault((name, without_pid), value) + if value > current: + samples[(s.name, without_pid)] = value + elif metric._multiprocess_mode == 'livesum': + samples[(name, without_pid)] += value + elif metric._multiprocess_mode == 'last': + without_hostname_pid = tuple(l for l in labels if l[0] != 'pid' and l[0] != 'hostname') + if not value_ts: + # Some wrong data, possible from previous versions + value_ts = int(time.time()) - 60*60*5 + current_ts = samples_ts.setdefault((name, without_hostname_pid), value_ts) + if value_ts >= current_ts: + samples[(name, without_hostname_pid)] = value + samples_ts[(name, without_hostname_pid)] = value_ts + else: # all/liveall + samples[(name, labels)] = value + + elif metric.type == 'histogram': + bucket = tuple(float(l[1]) for l in labels if l[0] == 'le') + if bucket: + # _bucket + without_le = tuple(l for l in labels if l[0] != 'le') + buckets.setdefault(without_le, {}) + buckets[without_le].setdefault(bucket[0], 0.0) + buckets[without_le][bucket[0]] += value + else: + # _sum/_count + samples[(s.name, labels)] += value + + else: + # Counter and Summary. + samples[(s.name, labels)] += value + + # Accumulate bucket values. + if metric.type == 'histogram': + for labels, values in buckets.items(): + acc = 0.0 + for bucket, value in sorted(values.items()): + sample_key = ( + metric.name + '_bucket', + labels + (('le', core._floatToGoString(bucket)),), + ) + if accumulate: + acc += value + samples[sample_key] = acc + else: + samples[sample_key] = value + if accumulate: + samples[(metric.name + '_count', labels)] = acc + + # Convert to correct sample format. + metric.samples = [core.Sample(name, dict(labels), value, samples_ts.get((name, labels))) for (name, labels), value in samples.items()] + return metrics.values() + + +def mark_distributed_process_dead(pid): + remove_from_distributed_list(pid) diff --git a/prometheus_client/gc_collector.py b/prometheus_client/gc_collector.py index 5acbeedb..06002609 100644 --- a/prometheus_client/gc_collector.py +++ b/prometheus_client/gc_collector.py @@ -18,6 +18,8 @@ def __init__(self, registry=core.REGISTRY, gc=gc): # the GC collector is always disabled in multiprocess mode. if 'prometheus_multiproc_dir' in os.environ: return + if 'prometheus_django_cache' in os.environ: + return if not hasattr(gc, 'callbacks'): return diff --git a/tests/test_distibuted.py b/tests/test_distibuted.py new file mode 100644 index 00000000..384a7610 --- /dev/null +++ b/tests/test_distibuted.py @@ -0,0 +1,321 @@ +from __future__ import unicode_literals + +import glob +import os +import shutil +import socket +import sys +import tempfile +from random import randint + +import django + +from prometheus_client import core +from prometheus_client.core import ( + CollectorRegistry, Counter, Gauge, Histogram, Sample, Summary, +) +from prometheus_client.multiprocess import ( + MultiProcessCollector, +) + +if sys.version_info < (2, 7): + # We need the skip decorators from unittest2 on Python 2.6. + import unittest2 as unittest +else: + import unittest + +from unittest.mock import Mock, patch + +hostname = socket.gethostname() + + +class TestDistributed(unittest.TestCase): + def patch(self, *args, **kwargs) -> Mock: + p = patch(*args, **kwargs) + mock = p.start() + self.addCleanup(p.stop) + return mock + + def setUp(self): + from django.conf import settings + if not settings.configured: + settings.configure( + CACHES={ + 'default': { + 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache', + 'LOCATION': 'extra-fast-sqlite-test', + } + } + ) + + self.pid = None + from prometheus_client.distributed import _pidFunc + self._original_pidFunc = _pidFunc + self.patch('prometheus_client.distributed._pidFunc', new=self.pidFunc) + + from prometheus_client.distributed import DistributedValue + core._ValueClass = DistributedValue + self.registry = CollectorRegistry() + from prometheus_client.distributed import DistributedCollector + self.collector = DistributedCollector(self.registry) + + def pidFunc(self): + if self.pid is not None: + return self.pid + return self._original_pidFunc() + + def tearDown(self): + core._ValueClass = core._MutexValue + + def test_counter_adds(self): + c1 = Counter('c2', 'help', registry=None) + c2 = Counter('c2', 'help', registry=None) + self.assertEqual(0, self.registry.get_sample_value('c2_total')) + c1.inc(1) + c2.inc(2) + self.assertEqual(3, self.registry.get_sample_value('c2_total')) + + def test_summary_adds(self): + s1 = Summary('s', 'help', registry=None) + s2 = Summary('s', 'help', registry=None) + self.assertEqual(0, self.registry.get_sample_value('s_count')) + self.assertEqual(0, self.registry.get_sample_value('s_sum')) + s1.observe(1) + s2.observe(2) + self.assertEqual(2, self.registry.get_sample_value('s_count')) + self.assertEqual(3, self.registry.get_sample_value('s_sum')) + + def test_histogram_adds(self): + h1 = Histogram('h', 'help', registry=None) + h2 = Histogram('h', 'help', registry=None) + self.assertEqual(0, self.registry.get_sample_value('h_count')) + self.assertEqual(0, self.registry.get_sample_value('h_sum')) + self.assertEqual(0, self.registry.get_sample_value('h_bucket', {'le': '5.0'})) + h1.observe(1) + h2.observe(2) + self.assertEqual(2, self.registry.get_sample_value('h_count')) + self.assertEqual(3, self.registry.get_sample_value('h_sum')) + self.assertEqual(2, self.registry.get_sample_value('h_bucket', {'le': '5.0'})) + + # def test_gauge_all(self): + # self.pid = 123 + # with self.assertRaises(Exception) as e: + # g1 = Gauge('g1', 'help', registry=None) + # self.assertTrue('not supported' in str(e.exception)) + + def test_gauge_last(self): + self.pid = 123 + g1 = Gauge('g1last', 'help', registry=None, multiprocess_mode='last') + g1.set(1) + self.pid = 456 + g1.set(2) + self.assertEqual(2, self.registry.get_sample_value('g1last')) + + def test_gauge_liveall(self): + self.pid = 123 + g1 = Gauge('g2', 'help', registry=None, multiprocess_mode='liveall') + self.pid = 456 + g2 = Gauge('g2', 'help', registry=None, multiprocess_mode='liveall') + self.assertEqual(0, self.registry.get_sample_value('g2', {'hostname':hostname,'pid': '123'})) + self.assertEqual(0, self.registry.get_sample_value('g2', {'hostname':hostname,'pid': '456'})) + self.pid = 123 + g1.set(1) + self.pid = 456 + g2.set(2) + self.assertEqual(1, self.registry.get_sample_value('g2', {'hostname':hostname,'pid': '123'})) + self.assertEqual(2, self.registry.get_sample_value('g2', {'hostname':hostname,'pid': '456'})) + from prometheus_client.distributed import mark_distributed_process_dead + mark_distributed_process_dead(123) + self.assertEqual(None, self.registry.get_sample_value('g2', {'hostname':hostname,'pid': '123'})) + self.assertEqual(2, self.registry.get_sample_value('g2', {'hostname':hostname,'pid': '456'})) + + def test_gauge_min(self): + self.pid = 123 + g1 = Gauge('gm', 'help', registry=None, multiprocess_mode='min') + self.pid = 456 + g2 = Gauge('gm', 'help', registry=None, multiprocess_mode='min') + self.assertEqual(0, self.registry.get_sample_value('gm', {'hostname':hostname})) + self.pid = 123 + g1.set(1) + self.pid = 456 + g2.set(2) + self.assertEqual(1, self.registry.get_sample_value('gm', {'hostname':hostname})) + + def test_gauge_max(self): + self.pid = 123 + g1 = Gauge('gmax', 'help', registry=None, multiprocess_mode='max') + self.pid = 456 + g2 = Gauge('gmax', 'help', registry=None, multiprocess_mode='max') + self.assertEqual(0, self.registry.get_sample_value('gmax', {'hostname':hostname})) + self.pid = 123 + g1.set(1) + self.pid = 456 + g2.set(2) + self.assertEqual(2, self.registry.get_sample_value('gmax', {'hostname':hostname})) + + def test_gauge_livesum(self): + self.pid = 123 + g1 = Gauge('gls', 'help', registry=None, multiprocess_mode='livesum') + self.pid = 456 + g2 = Gauge('gls', 'help', registry=None, multiprocess_mode='livesum') + self.assertEqual(0, self.registry.get_sample_value('gls', {'hostname':hostname})) + self.pid = 123 + g1.set(1) + self.pid = 456 + g2.set(2) + self.assertEqual(3, self.registry.get_sample_value('gls', {'hostname':hostname})) + from prometheus_client.distributed import mark_distributed_process_dead + mark_distributed_process_dead(123) + self.assertEqual(2, self.registry.get_sample_value('gls', {'hostname':hostname})) + + def test_namespace_subsystem(self): + c1 = Counter('c', 'help', registry=None, namespace='ns', subsystem='ss') + c1.inc(1) + self.assertEqual(1, self.registry.get_sample_value('ns_ss_c_total')) + + def test_counter_across_forks(self): + self.pid = 0 + c1 = Counter('c', 'help', registry=None) + self.assertEqual(0, self.registry.get_sample_value('c_total')) + c1.inc(1) + c1.inc(1) + self.pid = 1 + c1.inc(1) + self.assertEqual(3, self.registry.get_sample_value('c_total')) + self.assertEqual(1, c1._value.get()) + + @unittest.skipIf(sys.version_info < (2, 7), "Test requires Python 2.7+.") + def test_collect(self): + self.pid = 0 + + labels = dict((i, i) for i in 'abcd') + + def add_label(key, value): + l = labels.copy() + l[key] = value + return l + + c = Counter('c', 'help', labelnames=labels.keys(), registry=None) + g = Gauge('g', 'help', labelnames=labels.keys(), registry=None, multiprocess_mode='liveall') + gall = Gauge('gall', 'help', labelnames=labels.keys(), registry=None, multiprocess_mode='all') + gempty = Gauge('gempty', 'help', labelnames=labels.keys(), registry=None, multiprocess_mode='all') + h = Histogram('h', 'help', labelnames=labels.keys(), registry=None) + + c.labels(**labels).inc(1) + g.labels(**labels).set(1) + gall.labels(**labels).set(1) + h.labels(**labels).observe(1) + + self.pid = 1 + + c.labels(**labels).inc(1) + g.labels(**labels).set(1) + gall.labels(**labels).set(1) + h.labels(**labels).observe(5) + + metrics = dict((m.name, m) for m in self.collector.collect()) + + self.assertEqual( + metrics['c'].samples, [Sample('c_total', labels, 2.0)] + ) + metrics['g'].samples.sort(key=lambda x: x[1]['pid']) + for sample in metrics['g'].samples: + sample.labels.pop('hostname') + for sample in metrics['gall'].samples: + sample.labels.pop('hostname') + self.assertEqual(metrics['g'].samples, [ + Sample('g', add_label('pid', '0'), 1.0), + Sample('g', add_label('pid', '1'), 1.0), + ]) + self.assertEqual(metrics['gall'].samples, [ + Sample('gall', add_label('pid', '0'), 1.0), + Sample('gall', add_label('pid', '1'), 1.0), + ]) + + metrics['h'].samples.sort( + key=lambda x: (x[0], float(x[1].get('le', 0))) + ) + expected_histogram = [ + Sample('h_bucket', add_label('le', '0.005'), 0.0), + Sample('h_bucket', add_label('le', '0.01'), 0.0), + Sample('h_bucket', add_label('le', '0.025'), 0.0), + Sample('h_bucket', add_label('le', '0.05'), 0.0), + Sample('h_bucket', add_label('le', '0.075'), 0.0), + Sample('h_bucket', add_label('le', '0.1'), 0.0), + Sample('h_bucket', add_label('le', '0.25'), 0.0), + Sample('h_bucket', add_label('le', '0.5'), 0.0), + Sample('h_bucket', add_label('le', '0.75'), 0.0), + Sample('h_bucket', add_label('le', '1.0'), 1.0), + Sample('h_bucket', add_label('le', '2.5'), 1.0), + Sample('h_bucket', add_label('le', '5.0'), 2.0), + Sample('h_bucket', add_label('le', '7.5'), 2.0), + Sample('h_bucket', add_label('le', '10.0'), 2.0), + Sample('h_bucket', add_label('le', '+Inf'), 2.0), + Sample('h_count', labels, 2.0), + Sample('h_sum', labels, 6.0), + ] + + self.assertEqual(metrics['h'].samples, expected_histogram) + + @unittest.skipIf(sys.version_info < (2, 7), "Test requires Python 2.7+.") + def test_merge_no_accumulate(self): + self.pid = 0 + labels = dict((i, i) for i in 'abcd') + + def add_label(key, value): + l = labels.copy() + l[key] = value + return l + + h = Histogram('hna', 'help', labelnames=labels.keys(), registry=None) + h.labels(**labels).observe(1) + self.pid = 1 + h.labels(**labels).observe(5) + + self.collector.accumulate = False + metrics = self.collector.collect() + self.collector.accumulate = True + + metric = [x for x in metrics if x.name == 'hna'][0] + metric.samples.sort( + key=lambda x: (x[0], float(x[1].get('le', 0))) + ) + expected_histogram = [ + Sample('hna_bucket', add_label('le', '0.005'), 0.0), + Sample('hna_bucket', add_label('le', '0.01'), 0.0), + Sample('hna_bucket', add_label('le', '0.025'), 0.0), + Sample('hna_bucket', add_label('le', '0.05'), 0.0), + Sample('hna_bucket', add_label('le', '0.075'), 0.0), + Sample('hna_bucket', add_label('le', '0.1'), 0.0), + Sample('hna_bucket', add_label('le', '0.25'), 0.0), + Sample('hna_bucket', add_label('le', '0.5'), 0.0), + Sample('hna_bucket', add_label('le', '0.75'), 0.0), + Sample('hna_bucket', add_label('le', '1.0'), 1.0), + Sample('hna_bucket', add_label('le', '2.5'), 0.0), + Sample('hna_bucket', add_label('le', '5.0'), 1.0), + Sample('hna_bucket', add_label('le', '7.5'), 0.0), + Sample('hna_bucket', add_label('le', '10.0'), 0.0), + Sample('hna_bucket', add_label('le', '+Inf'), 0.0), + Sample('hna_sum', labels, 6.0), + ] + + self.assertEqual(metric.samples, expected_histogram) + + +class TestUnsetEnv(unittest.TestCase): + def setUp(self): + self.registry = CollectorRegistry() + fp, self.tmpfl = tempfile.mkstemp() + os.close(fp) + + def test_unset_syncdir_env(self): + self.assertRaises( + ValueError, MultiProcessCollector, self.registry) + + def test_file_syncpath(self): + registry = CollectorRegistry() + self.assertRaises( + ValueError, MultiProcessCollector, registry, self.tmpfl) + + def tearDown(self): + os.remove(self.tmpfl)