From a990b591c16b3e1f12c162a0701d7ada0c18deb5 Mon Sep 17 00:00:00 2001 From: Venkat Sundaram Date: Mon, 16 May 2016 01:40:05 -0600 Subject: [PATCH] Add support for Monasca Log Monasca is the OpenStack project for MONitoring At SCAle. Monasca also includes a Logging API that supports centralized logging of logs. The monasca-log-api is a OpenStack based microservice and supports Keystone (Identity Management) based authentication. It provides a secure, reliable and multi-tenant based Logging as a service. The flow is as follows: 1. Using the monsaca log configuration properties, get a token from keystone 2. Cache this token for performance 3. Get the monasca-log-api url from the keystone token service catalog. If the monasca_log_hosts is specified in the config, that would be used instead of the keystone catalog. This config property is a list of host names (or IPs) separated by comma 4. If the hosts list is specified, select a random host and form an url using that 5. Use the log url thus arrived at and open a HTTP connection. If it fails, get another random host. Use retries as well 6. Build the monasca-log-api dimensions, token and content-type (HTTP headers) for the request 7. Send the request to the open HTTP connection (using connection pools to reuse connections where possible) 8. Batch the requests to avoid network overhead and improve throughput Added unit tests and refactored for better readability --- beaver/config.py | 8 + beaver/dispatcher/tail.py | 3 +- beaver/tests/test_monascalog_transport.py | 200 ++++++++ beaver/transports/monascalog_transport.py | 554 ++++++++++++++++++++++ beaver/worker/tail.py | 3 +- requirements/base.txt | 1 + requirements/monasca.txt | 2 + requirements/tests.txt | 1 + 8 files changed, 770 insertions(+), 2 deletions(-) create mode 100644 beaver/tests/test_monascalog_transport.py create mode 100644 beaver/transports/monascalog_transport.py create mode 100644 requirements/monasca.txt diff --git a/beaver/config.py b/beaver/config.py index a246dc14..74a232f5 100644 --- a/beaver/config.py +++ b/beaver/config.py @@ -20,6 +20,7 @@ def __init__(self, args, logger=None): 'add_field_env': '', 'debug': '0', 'discover_interval': '15', + 'inter_pass_interval': '0.1', 'encoding': 'utf_8', # should be a python regex of files to remove @@ -132,12 +133,16 @@ def __init__(self, args, logger=None): # consumer processes 'number_of_consumer_processes': '1', + # bytes to read from a file at a time + 'bytes_to_read': '4096', + # interprocess queue max size before puts block 'max_queue_size': '100', # time in seconds before updating the file mapping 'update_file_mapping_time': '', # deprecated 'discover_interval': '15', + 'inter_pass_interval': '0.1', # time in seconds from last command sent before a queue kills itself 'queue_timeout': '60', @@ -185,6 +190,7 @@ def __init__(self, args, logger=None): # Ignore files older then n days, use 0 to disable 'ignore_old_files': 0 + } self._configfile = args.config @@ -331,6 +337,7 @@ def _main_parser(config): 'kafka_batch_t', 'kafka_ack_timeout', 'number_of_consumer_processes', + 'bytes_to_read', 'ignore_old_files' ] for key in require_int: @@ -340,6 +347,7 @@ def _main_parser(config): require_float = [ 'update_file_mapping_time', 'discover_interval', + 'inter_pass_interval', ] for key in require_float: diff --git a/beaver/dispatcher/tail.py b/beaver/dispatcher/tail.py index 7642530c..edfb5b6a 100644 --- a/beaver/dispatcher/tail.py +++ b/beaver/dispatcher/tail.py @@ -80,7 +80,8 @@ def create_queue_producer(): callback=queue_put, logger=logger ) - manager.run() + inter_pass_interval = beaver_config.get('inter_pass_interval') + manager.run(interval=inter_pass_interval) while 1: diff --git a/beaver/tests/test_monascalog_transport.py b/beaver/tests/test_monascalog_transport.py new file mode 100644 index 00000000..424d110b --- /dev/null +++ b/beaver/tests/test_monascalog_transport.py @@ -0,0 +1,200 @@ +# -*- coding: utf-8 -*- +import sys +if sys.version_info < (2, 7): + import unittest2 as unittest +else: + import unittest + +import socket +import mock +import tempfile +import logging +import httpretty + +import beaver +from beaver.config import BeaverConfig +from beaver.transports import create_transport +from beaver.transports.exception import TransportException + +from beaver.unicode_dammit import unicode_dammit + +from fixtures import Fixture, ZookeeperFixture, KafkaFixture + +from beaver.transports.monascalog_transport import MonascalogTransport +from beaver.transports.monascalog_transport import MonascaLogRecord +from beaver.transports.monascalog_transport import MonascaLogger + + +logging.basicConfig(loglevel=logging.DEBUG) +logger = logging.getLogger("test") + +class MonascaLogRecordTest(unittest.TestCase): + + @classmethod + def SetUpClass(cls): + pass + + @classmethod + def TearDownClass(cls): + pass + + def test_plain_text_format(self): + # test a simple plain text log + record = MonascaLogRecord("simple log line") + self.assertEqual(record.is_json(), False) + + def test_json_format(self): + # test a simple json log line + record = MonascaLogRecord('{"message": "simple log line", "type": "keystone"}', + is_json=True, + logger=logger) + self.assertEqual(record.is_json(), True) + + # get the json payload and make sure it is formatted in a way log api + # expects it + json_line = record.to_json() + self.assertIn("dimensions", json_line.keys()) + self.assertIn("type", json_line["dimensions"].keys()) + self.assertEqual(json_line["message"], "simple log line") + self.assertEqual(record.is_valid(), True) + + +class MonascaLoggerTest(unittest.TestCase): + + @classmethod + def SetUpClass(cls): + pass + + @classmethod + def TearDownClass(cls): + pass + + def test_construction(self): + m_logger = MonascaLogger(logger=logger) + self.assertIsInstance(m_logger, MonascaLogger) + self.assertEqual(m_logger._enable_batching, True) + +# using localhost to prevent proxy issues with httpretty +#LOG_URL = "http://www.logapi.com:5607/v3.0/logs" +LOG_URL = "http://localhost:5607/v3.0/logs" + +class MonascalogTests(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.logger = logging.getLogger(__name__) + + empty_conf = tempfile.NamedTemporaryFile(delete=True) + cls.beaver_config = BeaverConfig(mock.Mock(config=empty_conf.name)) + cls.server_host = "localhost" + cls.server_port = 5607 + cls.keystone_auth_url = "https://www.keystone.com:5000/v3.0/auth/tokens" + cls.keystone_user = "logger" + cls.keystone_password = "logger" + cls.keystone_project_name = "logger" + cls.keystone_domain_name = "logger" + cls.log_requests = [] + cls.hostname = socket.gethostname() + + @classmethod + def tearDownClass(cls): + pass + + @httpretty.activate + @mock.patch('beaver.transports.monascalog_transport.KeystoneClientHelper.get_token_and_log_url', return_value=("1234",LOG_URL)) + def test_monascalog(cls, token_mock): + + # dynamic callback to verify the log messages sent by the transport + def request_callback(request, uri, headers): + cls.log_requests.append(request.parsed_body) + return (204, headers, "created") + + # fake the first get call that is used for checking connection + httpretty.register_uri(httpretty.GET, LOG_URL, status=405, body="Method Not Allowed") + httpretty.register_uri(httpretty.POST, LOG_URL, status=204, body="Created") + cls.beaver_config.set('transport', 'monascalog') + cls.beaver_config.set('logstash_version', 1) + #cls.beaver_config.set('monascalog_hosts', "{}:{}".format(cls.server_host, cls.server_port)) + cls.beaver_config.set('monascalog_max_retries', 3) + cls.beaver_config.set('monascalog_auth_url', cls.keystone_auth_url) + cls.beaver_config.set('monascalog_user_name', cls.keystone_user) + cls.beaver_config.set('monascalog_password', cls.keystone_password) + cls.beaver_config.set('monascalog_project_name', cls.keystone_project_name) + cls.beaver_config.set('monascalog_domain_name', cls.keystone_domain_name) + + transport = create_transport(cls.beaver_config, logger=cls.logger) + + cls.assertIsInstance(transport, beaver.transports.monascalog_transport.MonascalogTransport) + cls.assertEqual(transport.valid(), True) + + data = {} + lines = [] + n=100 + for i in range(n): + lines.append('log' + str(i) + '\n') + new_lines = [] + for line in lines: + message = unicode_dammit(line) + if len(message) == 0: + continue + new_lines.append(message) + data['lines'] = new_lines + data['fields'] = [] + cls.assertEqual(transport.callback("test.log", **data), True) + + # Fake a log api failure + httpretty.reset() + httpretty.register_uri(httpretty.POST, LOG_URL, status=500, body="Internal Server Error") + cls.assertRaises(TransportException, transport.callback, "test.log", **data) + + # simulate a single failure followed by success, to test if retry works + httpretty.reset() + httpretty.register_uri(httpretty.POST, LOG_URL, responses=[ + httpretty.Response(status=503, body="Service Unavailable"), + httpretty.Response(status=204, body="Created") + ]) + cls.assertEqual(transport.callback("test.log", **data), True) + + # next, test if the logs made it to the server + httpretty.reset() + httpretty.register_uri(httpretty.POST, LOG_URL, body=request_callback) + # clear logs from previous tests + del cls.log_requests[:] + cls.assertEqual(transport.callback("test.log", **data), True) + cls._consume_messages(n) + + # repeat same test, but with batching turned off + cls.beaver_config.set('monascalog_enable_batching', False) + httpretty.reset() + httpretty.register_uri(httpretty.POST, LOG_URL, body=request_callback) + # fake the first get call that is used for checking connection + httpretty.register_uri(httpretty.GET, LOG_URL, status=405, body="Method Not Allowed") + transport = create_transport(cls.beaver_config, logger=cls.logger) + # clear logs from previous tests + del cls.log_requests[:] + cls.assertEqual(transport.callback("test.log", **data), True) + cls._consume_messages(n, batching=False) + transport.interrupt() + + def _consume_messages(cls, number_of_messages, batching=True): + messages = cls.log_requests + for message in messages: + cls.assertIn("dimensions", message) + cls.assertIn("logs", message) + cls.assertIsInstance(message["dimensions"], dict) + dims = message["dimensions"] + cls.assertEqual(dims["file"], "test.log") + cls.assertEqual(dims["host"], cls.hostname) + cls.assertIsInstance(message["logs"], list) + for log in message["logs"]: + cls.assertIn("message", log) + if not log["message"]: + cls.fail("Log message is empty") + #print(message) + #print('\n') + + if not batching: + cls.assertEqual(len(messages), number_of_messages) + else: + msg = messages[0] + cls.assertEqual(len(msg["logs"]), number_of_messages) diff --git a/beaver/transports/monascalog_transport.py b/beaver/transports/monascalog_transport.py new file mode 100644 index 00000000..8f12a72c --- /dev/null +++ b/beaver/transports/monascalog_transport.py @@ -0,0 +1,554 @@ +# -*- coding: utf-8 -*- +import time +import os +import random +import requests +import json +import collections +import logging +import types + +from beaver.transports.base_transport import BaseTransport +from beaver.transports.exception import TransportException +from dogpile.cache import make_region +from keystoneclient.auth.identity import v3 +from keystoneclient import session +from keystoneclient.v3 import client + + +MAX_LEN=255 + +class MonascalogTransport(BaseTransport): + + def __init__(self, beaver_config, logger=None): + super(MonascalogTransport, self).__init__(beaver_config, logger=logger) + + self._is_valid = False + self._beaver_config = beaver_config + self._parse_config_entries(self._beaver_config) + self._monasca_logger = MonascaLogger( + auth_url = self._keystone_auth_url, + username = self._keystone_user_name, + password = self._keystone_password, + project_name = self._keystone_project_name, + user_domain_name = self._keystone_domain_name, + project_domain_name = self._keystone_domain_name, + log_api_url = self._monasca_log_url, + log_api_hosts = self._monasca_log_hosts, + log_api_protocol = self._monasca_log_protocol, + log_api_uri = self._monasca_log_uri, + enable_batching = self._enable_batching, + token_cache_expiration_secs = self._cache_expiration_secs, + retry_on_failure = True, + max_failure = self._max_retries, + logger = self._logger + ) + self._log_session = None + self._log_api_url = None + self._token = None + + self._connect() + + def _parse_config_entries(self, beaver_config): + self._keystone_auth_url = beaver_config.get('monascalog_auth_url') + log_hosts = beaver_config.get('monascalog_hosts') + self._monasca_log_hosts = log_hosts.split(",") if log_hosts else [] + self._monasca_log_url = beaver_config.get('monascalog_url') + self._monasca_log_protocol = beaver_config.get('monascalog_protocol') + self._monasca_log_uri = beaver_config.get('monascalog_uri') + self._keystone_user_name = beaver_config.get('monascalog_user_name') + self._keystone_password = beaver_config.get('monascalog_password') + self._keystone_project_name = beaver_config.get('monascalog_project_name') + self._keystone_domain_name = beaver_config.get('monascalog_domain_name') + self._max_retries = int(beaver_config.get('monascalog_max_retries')) + batching = beaver_config.get('monascalog_enable_batching', 'true') + if batching in ['true','True','yes','Yes','y','Y']: + self._enable_batching = True + else: + self._enable_batching = False + cache_expire_secs = beaver_config.get('monascalog_cache_expiration_secs') + if cache_expire_secs: + self._cache_expiration_secs = int(cache_expire_secs) + else: + # use the queue expiration time instead + self._cache_expiration_secs = int(beaver_config.get('queue_timeout')) + + def _connect(self): + self._is_valid = self._monasca_logger.check_connection() + + def reconnect(self): + self._connect() + + def invalidate(self): + """Invalidates the current transport""" + super(MonascalogTransport, self).invalidate() + self._monasca_logger.clear_cache() + return False + + def _get_global_dimensions(self, filename, kwargs, normalize=True): + """Returns formatted dimensions that are common for all lines""" + timestamp = self.get_timestamp(**kwargs) + dimensions = { + self._fields.get('type'): kwargs.get('type'), + '@timestamp': timestamp, + self._fields.get('host'): self._current_host, + self._fields.get('file'): filename, + } + + if self._logstash_version == 0: + dimensions['@source'] = 'file://{0}'.format(filename) + dimensions['@fields'] = kwargs.get('fields') + else: + dimensions['@version'] = "{0}".format(self._logstash_version) + fields = kwargs.get('fields') + for key in fields: + dimensions[key] = fields[key] + + self._logger.debug("DIMENSIONS = {}".format(dimensions)) + if normalize: + dimensions = normalize_dimensions(dimensions, self._logger) + return dimensions + + def callback(self, filename, lines, **kwargs): + #FIXME: change this from info to debug + self._logger.info('About to batch {0} lines for file {1}'.\ + format(len(lines), filename) ) + self._log_format = self._beaver_config.get_field('format', filename) + global_dimensions = self._get_global_dimensions(filename, kwargs) + success, code, reason = self._monasca_logger.log_multi(lines, global_dimensions, self._log_format == "rawjson") + if not success: + raise TransportException("{}: {}".format(code,reason)) + return True + +########################################################## + +# Helper function to print requests size details +def print_req_stats(r, *args, **kwargs): + logger = getattr(print_req_stats, "logger", logging.getLogger("monascalog_transport")) + logger.debug("status_code: {}, reason: {}, url: {}".format(r.status_code, + r.reason, r.url)) + if r.ok: + req_body_len = len(r.request.body) + req_headers_len = len('\n'.join("{}: {}".format(k,v) for k,v in r.request.headers.iteritems())) + res_body_len = len(r.content) + res_headers_len = len('\n'.join("{}: {}".format(k,v) for k,v in r.headers.iteritems())) + total_size = req_body_len + req_headers_len + res_body_len + res_headers_len + log_lines_len = getattr(print_req_stats, "lines_length", -1) + logger.debug("Log size = {} bytes, Request = {} bytes, Response = {} bytes total({}) bytes and it took {} seconds".format(log_lines_len, req_body_len+req_headers_len, res_body_len+res_headers_len, total_size, r.elapsed.total_seconds())) + + +# Helper method to normalize dictionary and make it suitable to be sent +# as dimensions to log api +def normalize_dimensions(dimensions, logger): + #First, flatten if there are any dict fields + normalized_dimensions = flatten(dimensions) + + keys_to_remove = [] + #Next, if there are any list items, flatten + for key in normalized_dimensions: + value = normalized_dimensions[key] + logger.debug("KEY = {} VALUE = {}".format(key,value)) + if isinstance(value, list): + value = normalized_dimensions[key] = ",".join(value) + if value and not isinstance(value, types.StringTypes): + value = normalized_dimensions[key] = str(value) + if not value: + # log api doesn't like empty dimensions + keys_to_remove.append(key) + + for k in keys_to_remove: + logger.debug("Removing empty dimension entry: {}".format(k)) + del normalized_dimensions[k] + return normalized_dimensions + +# Helper method to flatten a dictionary +def flatten(d, parent_key='', sep='_'): + items = [] + for k, v in d.items(): + new_key = parent_key + sep + k if parent_key else k + if isinstance(v, collections.MutableMapping): + items.extend(flatten(v, new_key, sep=sep).items()) + else: + items.append((new_key, v)) + return dict(items) + + +""" +MonascaLogRecord abstracts an individual log line string. +It could be instantiated with the line that is read +from a log file or just a string. +If this is a JSON log line read from a file (for example, +Beaver does this), this would still be a string and has +to be converted to JSON. To do this, caller must set +is_json to True. +If this is just a plain text, the is_json field need +not be specified. +Once instantiated, the to_json method would format the +log in a manner suitable to be posted to the Monasca +Log API and returns the JSON payload. +""" +class MonascaLogRecord(object): + + def __init__(self, log_line, is_json=False, logger=None): + self._log_line = log_line + self._json_format = is_json + self._logger = logger + + def is_json(self, validate=False): + if validate and self._json_format: + try: + valid_dict = json.loads(self._log_line) + return True + except ValueError as ve: + return False + return self._json_format + + def _validate_length(self, field): + if len(field) > MAX_LEN: + return (False, "{} exceeds {} chars!".format(field, MAX_LEN)) + return (True, "") + + def is_valid(self): + """Check if this object would make a payload + that confirms to Monasca log api validation + rules""" + errors = ["Dimension Validation Errors:"] + #Get the json payload first + json_payload = self.to_json(normalize=False) + #check if the dimensions are valid + for k,v in json_payload["dimensions"].iteritems(): + # check if value is non-empty + if not v: + reason = "Value for key:{} is empty".format(k) + errors.append(reason) + continue + # check if the value is of string type + if not isinstance(v, types.StringTypes): + valid = False + errors.append("Dimension value has to be string type. {} is not!".format(v)) + continue # as the remaining checks are for a string type + # check if the key and value lengths are valid + valid, reason = self._validate_length(k) + if not valid: + errors.append(reason) + valid, reason = self._validate_length(v) + if not valid: + errors.append(reason) + success = len(errors) == 1 + return success + + def to_json(self, normalize=True): + output = {"message": self._get_log_message(), + "dimensions": self._get_dimensions(normalize)} + return output + + + def _get_dimensions(self, normalize=True): + if self.is_json(validate=True): + dims = json.loads(self._log_line) + dims.pop("message", "") + else: + dims = {} + + if normalize: + dims = normalize_dimensions(dims, self._logger) + return dims + + def _get_log_message(self): + if self.is_json(validate=True): + return json.loads(self._log_line).get("message", "") + else: + return self._log_line + + +"""MonascaLogger implements the core transport details that + formats, packages and ships logs to Monasca Log API. + It uses a Keystone helper class to cache tokens used for + authenticating the log requests""" +class MonascaLogger(object): + + def __init__(self, **kwargs): + # extract keystone related fields + self._keystone_auth_url = kwargs.get("auth_url", os.getenv("OS_AUTH_URL", "http://localhost:5000/v3")) + self._keystone_username = kwargs.get("username", os.getenv("OS_USERNAME")) + self._keystone_user_domain_name = kwargs.get("user_domain_name", os.getenv("OS_USER_DOMAIN_NAME")) + self._keystone_project_name = kwargs.get("project_name", os.getenv("OS_PROJECT_NAME")) + self._keystone_project_domain_name = kwargs.get("project_domain_name", os.getenv("OS_PROJECT_DOMAIN_NAME")) + self._keystone_password = kwargs.get("password", os.getenv("OS_PASSWORD")) + self._keystone_token_cache_expiration_secs = kwargs.get("token_cache_expiration_secs", 3600) + + # extract log_api related fields + self._log_api_hosts = kwargs.get("log_api_hosts", os.getenv("OS_LOG_API_HOSTS")) + if self._log_api_hosts: + # get protocol and uri + self._log_api_protocol = kwargs.get("log_api_protocol", os.getenv("OS_LOG_API_PROTOCOL", "http")) + self._log_api_uri = kwargs.get("log_api_uri", os.getenv("OS_LOG_API_URI", "v3.0/logs")) + + self._log_api_url = kwargs.get("log_api_url", os.getenv("OS_LOG_API_URL")) + + # extract other fields + enable_batching = kwargs.get("enable_batching", os.getenv("OS_LOG_API_ENABLE_BATCHING","True").lower()) + self._enable_batching = enable_batching in [True, "true", "yes", "y", "1"] + retry_on_failure = kwargs.get("retry_on_failure", os.getenv("OS_LOG_API_RETRY_ON_FAILURE", "False").lower()) + self._retry_on_failure = retry_on_failure in ["true", "yes", "y", "1"] + self._max_failure = int(kwargs.get("max_failure", os.getenv("OS_LOG_API_MAX_FAILURE", 3))) + self._exp_backoff_secs = float(kwargs.get("exp_backoff_secs", os.getenv("OS_LOG_API_EXP_BACKOFF_SECS", 2))) + self._logger = kwargs.get("logger", None) + self._log_session = None + self._token = None + # construct a keystone client + self._keystone = KeystoneClientHelper(self._keystone_auth_url, + self._keystone_username, + self._keystone_password, + self._keystone_project_name, + self._keystone_user_domain_name, + self._keystone_project_domain_name, + self._logger, + self._keystone_token_cache_expiration_secs) + + def _get_token_and_log_server_url(self): + # get from keystone + token, log_url = self._keystone.get_token_and_log_url() + if self._log_api_hosts: + # get a random server from the list + self._log_api_url = "{0}://{1}/{2}".format(self._log_api_protocol, + random.choice(self._log_api_hosts), + self._log_api_uri + ) + elif not self._log_api_url: + self._log_api_url = log_url + + self._token = token + + def check_connection(self): + failure = 1 + print_req_stats.logger = self._logger + while True: + if not self._log_session: + self._log_session = requests.Session() + try: + self._get_token_and_log_server_url() + self._logger.info('connect: {0}'.format(self._log_api_url)) + self._log_session.get(self._log_api_url, headers={"X-Auth-Token": self._token}, + verify=False, + hooks=dict(response=print_req_stats)) + except Exception, e: + self._logger.error( + 'Exception caught validating url connection: ' + str(e)) + if failure < self._max_failure: + failure += 1 + time.sleep(self._exp_backoff_secs ** failure) + self._logger.debug( + 'Retrying connection, attempt {0}'.format(failure)) + continue + self._logger.error("Failed to connect") + return False + else: + self._logger.info('Connection validated') + return True + + def clear_cache(self): + self._keystone.invalidate_cache() + + def _build_request_headers(self): + headers = {} + headers["X-Auth-Token"] = self._token + headers["Content-Type"] = "application/json" + return headers + + def _build_request_payload(self, lines, global_dimensions, is_json=False): + log_payload = {"dimensions": global_dimensions, + "logs": []} + for line in lines: + record = MonascaLogRecord(line, is_json, logger=self._logger) + log_payload["logs"].append(record.to_json()) + return log_payload + + def _post_logs(self, lines, global_dimensions, is_json=False): + if not isinstance(lines, list): + lines = [lines] + payload = self._build_request_payload(lines, global_dimensions, is_json) + headers = self._build_request_headers() + self._logger.debug('About to post log to url %s' % (self._log_api_url)) + self._logger.debug("Headers: {}".format(headers)) + self._logger.debug("Payload: {}".format(payload)) + message = "" + + lines_length = 0 + for l in lines: + lines_length += len(l) + print_req_stats.lines_length = lines_length + print_req_stats.logger = self._logger + failure = 1 + result = None + while True: + try: + result = self._log_session.post(url=self._log_api_url, + json=payload, + headers=headers, + verify=False, + hooks=dict(response=print_req_stats)) + if result.status_code in range(200, 300): + self._logger.debug('success posting log') + # success + return (True, result.status_code, result.reason) + elif result.status_code in [400, 422]: + # Bad format error, no point in retrying + # Log the error and move on + message = "The server rejected the payload: {0}".\ + format(payload) + self._logger.error(message) + return (True, result.status_code, result.reason) + elif result.status_code in [401,403]: + message = "Invalid token. Will invalidate cache and retry."\ + " status code = %d" % result.status_code + else: + message = 'Post returned non 2xx http status: {0}/{1}'.\ + format(result.status_code, result.reason) + except Exception, e: + message = e.message + + self._logger.debug(message) + if failure < self._max_failure: + failure += 1 + time.sleep(self._exp_backoff_secs ** failure) + self._logger.debug( + 'Retrying connection, attempt {0}'.format(failure)) + continue + status_code = result.status_code if result else None + reason = result.reason if result else None + return_value = (False, status_code, reason) + break + + return return_value + + # Log a single line + def log(self, line, global_dimensions, is_json=False): + return self._post_logs(line, global_dimensions, is_json) + + # Log multiple lines + def log_multi(self, lines, global_dimensions, is_json=False): + result = (True, 200, 'OK') + if self._enable_batching: + result = self._post_logs(lines, global_dimensions, is_json) + else: + for line in lines: + result = self._post_logs(line, global_dimensions, is_json) + if not result[0]: + break + return result + + +# Helper class to get keystone token and logging url from the service catalog +class KeystoneClientHelper(object): + + def __init__(self, auth_url, user_name, password, project_name, user_domain_name, + project_domain_name, logger, cache_expiration_seconds): + self._auth_url = auth_url + self._user_name = user_name + self._password = password + self._project_name = project_name + self._user_domain_name = user_domain_name + self._project_domain_name = project_domain_name + self._logger = logger + + # set an in-memory dogpile cache that would cache token response for + # a configurable period of time + if cache_expiration_seconds > 0: + self._cache = make_region().configure("dogpile.cache.memory", + expiration_time=cache_expiration_seconds) + else: + self._cache = None + + def _get_logging_url_from_catalog(self, auth): + for catalog in auth.auth_ref['catalog']: + if catalog['type'] != 'logging': + continue + for ept in catalog['endpoints']: + if ept['interface'] == 'public': + return ept['url'] + return None + + def _get_cache_entries(self): + token = None + log_url = None + if self._cache: + token = self._cache.get("token") + log_url = self._cache.get("log_url") + return token, log_url + + + def _cache_entries(self, token, log_url): + if not self._cache: + return + # cache the token and url + self._cache.set("token", token) + self._cache.set("log_url", log_url) + + + def get_token_and_log_url(self): + token, log_url = self._get_cache_entries() + if not token: + self._logger.debug("going to call inner get_cached_token") + token, log_url = self._get_token( + self._auth_url, self._user_name, self._password, + self._project_name, self._user_domain_name, + self._project_domain_name) + # cache the token and url, if caching is enabled + self._cache_entries(token, log_url) + + return token, log_url + + def _get_token(self, auth_url, user_name, password, project_name, + user_domain_name, project_domain_name): + logging_url = None + try: + auth = v3.Password(auth_url=auth_url, username=user_name, + password=password, project_name=project_name, + user_domain_name=user_domain_name, + project_domain_name=project_domain_name) + sess = session.Session(auth=auth, verify=False) + keystone = client.Client(session=sess) + token = keystone.session.get_token() + logging_url = self._get_logging_url_from_catalog(auth) + except Exception, e: + self._logger.error("Unable to get token from Keystone: {0}".format(e.message)) + raise TransportException(e.message) + + return token, logging_url + + def invalidate_cache(self): + if self._cache: + self._cache.invalidate("token") + self._cache.invalidate("log_url") + + +def main(): + #logging.basicConfig(level=logging.INFO) + logging.basicConfig(level=logging.DEBUG) + logger = logging.getLogger("test") + + # Note: The constructor would use environment variables if not all params + # are explicitly passed + mlogger = MonascaLogger(logger=logger, retry_on_failure=True, + max_failure=2) + mlogger.check_connection() + line = '{"a":{"b":10}, "m":"tessssssss"}' + lines= ['{"message":"Once upon a time..the end", "type": "nova"}', + line + ] + + rec = MonascaLogRecord(line, is_json=True, logger=logger) + # is_valid call below should return False, as the dict value is not + # a valid dimension + print "Is Valid = {}".format(rec.is_valid()) + result = mlogger.log(line, {"filename": "xyz.log"}, is_json=True) + # however, it should log fine, as MonascaLogger should normalize and + # flatten it + print result + result = mlogger.log_multi(lines, {"filename": "xyz.log"}, is_json=True) + print result + + +if __name__ == "__main__": + main() diff --git a/beaver/worker/tail.py b/beaver/worker/tail.py index 2c1a52e1..15a03e38 100644 --- a/beaver/worker/tail.py +++ b/beaver/worker/tail.py @@ -46,6 +46,7 @@ def __init__(self, filename, callback, position="end", logger=None, beaver_confi self._tail_lines = beaver_config.get_field('tail_lines', filename) self._tags = beaver_config.get_field('tags', filename) self._type = beaver_config.get_field('type', filename) + self._bytes_to_read = int(beaver_config.get('bytes_to_read')) # The following is for the buffered tokenization # Store the specified delimiter @@ -233,7 +234,7 @@ def _run_pass(self): """Read lines from a file and performs a callback against them""" while True: try: - data = self._file.read(4096) + data = self._file.read(self._bytes_to_read) except IOError, e: if e.errno == errno.ESTALE: self.active = False diff --git a/requirements/base.txt b/requirements/base.txt index 6e282a9f..cc374236 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -8,3 +8,4 @@ pika>=0.9.14 python-daemon>=1.5.2,<=1.6.1 redis>=2.7.5 requests +dogpile.cache>=0.5.4 diff --git a/requirements/monasca.txt b/requirements/monasca.txt new file mode 100644 index 00000000..5d96175c --- /dev/null +++ b/requirements/monasca.txt @@ -0,0 +1,2 @@ +-r base.txt +python-keystoneclient>=1.8.1 diff --git a/requirements/tests.txt b/requirements/tests.txt index a7775cc6..b9ded6a1 100644 --- a/requirements/tests.txt +++ b/requirements/tests.txt @@ -6,3 +6,4 @@ six unittest2 coveralls moto +httpretty