diff --git a/CMakeLists.txt b/CMakeLists.txt index f73d9aea9..9e47f6818 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -57,6 +57,7 @@ list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_LIST_DIR}/cmake") find_package(Boost REQUIRED COMPONENTS unit_test_framework program_options system filesystem) find_package(Git QUIET) find_package(ApMon MODULE) +find_package(CURL MODULE) find_package(RdKafka CONFIG) #################################### @@ -105,11 +106,12 @@ add_library(Monitoring SHARED src/Exceptions/MonitoringException.cxx $<$:src/Backends/ApMonBackend.cxx> $<$:src/Transports/Kafka.cxx> + $<$:src/Transports/HTTP.cxx> ) target_include_directories(Monitoring - PUBLIC - $ + PUBLIC + $ $ PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src @@ -127,6 +129,7 @@ target_link_libraries(Monitoring pthread $<$:ApMon::ApMon> $<$:RdKafka::rdkafka++> + $<$:CURL::libcurl> ) # Handle ApMon optional dependency @@ -138,6 +141,10 @@ if(RdKafka_FOUND) message(STATUS " Compiling Kafka transport") endif() +if(CURL_FOUND) + message(STATUS " Compiling HTTP transport/InfluxDB 2.x backend") +endif() + # Detect operating system if (UNIX AND NOT APPLE) message(STATUS "Detected Linux: Process monitor enabled") @@ -155,6 +162,7 @@ target_compile_definitions(Monitoring $<$:O2_MONITORING_OS_LINUX> $<$:O2_MONITORING_WITH_APPMON> $<$:O2_MONITORING_WITH_KAFKA> + $<$:O2_MONITORING_WITH_CURL> ) # Use C++17 @@ -217,7 +225,7 @@ foreach (test ${TEST_SRCS}) add_executable(${test_name} ${test}) target_link_libraries(${test_name} - PRIVATE + PRIVATE Monitoring Boost::unit_test_framework Boost::filesystem ) add_test(NAME ${test_name} COMMAND ${test_name}) diff --git a/README.md b/README.md index 0267821e1..646f41752 100644 --- a/README.md +++ b/README.md @@ -48,6 +48,7 @@ See the table below to find `URI`s for supported backends: | InfluxDB | Unix socket | `influxdb-unix` | - | `info` | | InfluxDB | StdOut | `influxdb-stdout` | - | `info` | | InfluxDB | Kafka | `influxdb-kafka` | Kafka topic | `info` | +| InfluxDB 2.x | HTTP | `influxdbv2` | `org=ORG&bucket=BUCKET&token=TOKEN` | `info` | | ApMon | UDP | `apmon` | - | `info` | | StdOut | - | `stdout`, `infologger` | [Prefix] | `debug` | @@ -62,7 +63,7 @@ A metric consist of 5 parameters: | Parameter name | Type | Required | Default | | -------------- |:--------------------------------:|:--------:| -----------------------:| | name | string | yes | - | -| values | map<string, int/double/string/uint64_t> | no/1 | - | +| values | map<string, int/double/string/uint64_t> | no/1 | - | | timestamp | time_point<system_clock> | no | current time | | verbosity | Enum (Debug/Info/Prod) | no | Verbosity::Info | | tags | map | no | host and process names | diff --git a/src/Backends/StdOut.cxx b/src/Backends/StdOut.cxx index b61793608..f831f85e7 100644 --- a/src/Backends/StdOut.cxx +++ b/src/Backends/StdOut.cxx @@ -65,8 +65,8 @@ void StdOut::send(std::vector&& metrics) void StdOut::send(const Metric& metric) { - mStream << "[" << mPrefix << "] " << metric.getName(); for (auto& value : metric.getValues()) { + mStream << "[" << mPrefix << "] " << metric.getName(); auto stringValue = std::visit(overloaded{ [](const std::string& value) -> std::string { return value; }, [](auto value) -> std::string { return std::to_string(value); } @@ -74,15 +74,14 @@ void StdOut::send(const Metric& metric) if (metric.getValuesSize() == 1) { mStream << ",0 " << stringValue; } else { - mStream << ' ' << value.first << '=' << stringValue; + mStream << ",0 " << value.first << '=' << stringValue; } + mStream << ' ' << convertTimestamp(metric.getTimestamp()) << ' ' << tagString; + for (const auto& [key, value] : metric.getTags()) { + mStream << ',' << tags::TAG_KEY[key] << "=" << tags::GetValue(value); + } + mStream << '\n'; } - mStream << ' ' << convertTimestamp(metric.getTimestamp()) << ' ' << tagString; - - for (const auto& [key, value] : metric.getTags()) { - mStream << ',' << tags::TAG_KEY[key] << "=" << tags::GetValue(value); - } - mStream << '\n'; } } // namespace backends diff --git a/src/MonitoringFactory.cxx b/src/MonitoringFactory.cxx index c22b327b8..83b1b2a00 100644 --- a/src/MonitoringFactory.cxx +++ b/src/MonitoringFactory.cxx @@ -36,6 +36,10 @@ #include "Transports/Kafka.h" #endif +#ifdef O2_MONITORING_WITH_CURL +#include "Transports/HTTP.h" +#endif + namespace o2 { /// ALICE O2 Monitoring system @@ -56,6 +60,37 @@ std::unique_ptr getStdOut(http::url uri) } } +/// Extracts token from header add sets it as addition HTTP header +/// http://localhost:9999/?org=YOUR_ORG&bucket=YOUR_BUCKET&token=AUTH_TOKEN +/// -> +/// http://localhost:9999/api/v2/write?org=YOUR_ORG&bucket=YOUR_BUCKET +/// --header "Authorization: Token YOURAUTHTOKEN" +std::unique_ptr getInfluxDbv2(http::url uri) +{ +#ifdef O2_MONITORING_WITH_CURL + std::string tokenLabel = "token="; + std::string path = "/api/v2/write"; + std::string query = uri.search; + + auto tokenStart = query.find(tokenLabel); + auto tokenEnd = query.find('&', tokenStart); + if (tokenEnd == std::string::npos) { + tokenEnd = query.length(); + } + std::string token = query.substr(tokenStart + tokenLabel.length(), tokenEnd-(tokenStart + tokenLabel.length())); + // make sure ampersand is removed + if (tokenEnd < query.length() && query.at(tokenEnd) == '&') tokenEnd++; + if (tokenStart > 0 && query.at(tokenStart-1) == '&') tokenStart--; + query.erase(tokenStart, tokenEnd - tokenStart); + + auto transport = std::make_unique("http://" + uri.host + ':' + std::to_string(uri.port) + path + '?' + query); + transport->addHeader("Authorization: Token " + token); + return std::make_unique(std::move(transport)); +#else + throw std::runtime_error("HTTP transport is not enabled"); +#endif +} + std::unique_ptr getInfluxDb(http::url uri) { auto const position = uri.protocol.find_last_of('-'); @@ -129,6 +164,7 @@ std::unique_ptr MonitoringFactory::GetBackend(std::string& url) {"influxdb-unix", getInfluxDb}, {"influxdb-stdout", getInfluxDb}, {"influxdb-kafka", getInfluxDb}, + {"influxdbv2", getInfluxDbv2}, {"apmon", getApMon}, {"no-op", getNoop} }; diff --git a/src/Transports/HTTP.cxx b/src/Transports/HTTP.cxx new file mode 100644 index 000000000..6d86a2b7c --- /dev/null +++ b/src/Transports/HTTP.cxx @@ -0,0 +1,68 @@ +/// +/// \file HTTP.cxx +/// \author Adam Wegrzynek +/// + +#include "HTTP.h" +#include "../MonLogger.h" +#include "../Exceptions/MonitoringException.h" +#include + +namespace o2 +{ +/// ALICE O2 Monitoring system +namespace monitoring +{ +/// Monitoring transports +namespace transports +{ + +HTTP::HTTP(const std::string& url) +{ + mHeaders = NULL; + mCurl = curl_easy_init(); + curl_easy_setopt(mCurl, CURLOPT_URL, url.c_str()); + curl_easy_setopt(mCurl, CURLOPT_SSL_VERIFYPEER, 0); + curl_easy_setopt(mCurl, CURLOPT_CONNECTTIMEOUT, 10); + curl_easy_setopt(mCurl, CURLOPT_TIMEOUT, 10); + curl_easy_setopt(mCurl, CURLOPT_POST, 1); + curl_easy_setopt(mCurl, CURLOPT_TCP_KEEPIDLE, 120L); + curl_easy_setopt(mCurl, CURLOPT_TCP_KEEPINTVL, 60L); + FILE *devnull = fopen("/dev/null", "w+"); + curl_easy_setopt(mCurl, CURLOPT_WRITEDATA, devnull); + + MonLogger::Get() << "HTTP transport initialized (" << url << ")" << MonLogger::End(); +} + +HTTP::~HTTP() +{ + curl_slist_free_all(mHeaders); + curl_easy_cleanup(mCurl); + curl_global_cleanup(); +} + +void HTTP::addHeader(const std::string& header) +{ + mHeaders = curl_slist_append(mHeaders, header.c_str()); + curl_easy_setopt(mCurl, CURLOPT_HTTPHEADER, mHeaders); +} + +void HTTP::send(std::string&& post) +{ + CURLcode response; + long responseCode; + curl_easy_setopt(mCurl, CURLOPT_POSTFIELDS, post.c_str()); + curl_easy_setopt(mCurl, CURLOPT_POSTFIELDSIZE, (long) post.length()); + response = curl_easy_perform(mCurl); + curl_easy_getinfo(mCurl, CURLINFO_RESPONSE_CODE, &responseCode); + if (response != CURLE_OK) { + MonLogger::Get() << "HTTP Tranport " << curl_easy_strerror(response) << MonLogger::End(); + } + if (responseCode < 200 || responseCode > 206) { + MonLogger::Get() << "HTTP Transport: Response code : " << std::to_string(responseCode) << MonLogger::End(); + } +} + +} // namespace transports +} // namespace monitoring +} // namespace o2 diff --git a/src/Transports/HTTP.h b/src/Transports/HTTP.h new file mode 100644 index 000000000..bd9b9b4e9 --- /dev/null +++ b/src/Transports/HTTP.h @@ -0,0 +1,54 @@ +/// +/// \file HTTP.h +/// \author Adam Wegrzynek +/// + +#ifndef ALICEO2_MONITORING_TRANSPORTS_HTTP_H +#define ALICEO2_MONITORING_TRANSPORTS_HTTP_H + +#include "TransportInterface.h" + +#include +#include + +namespace o2 +{ +/// ALICE O2 Monitoring system +namespace monitoring +{ +/// Monitoring transports +namespace transports +{ + +/// \brief HTTP POST transport +/// +/// Allows to push string formatted metrics as HTTP POST requests via cURL +class HTTP : public TransportInterface +{ + public: + /// Constructor + /// \param url URL of HTTP server endpoint + HTTP(const std::string& url); + + /// Destructor + ~HTTP(); + + /// Sends metric via HTTP POST + /// \param post r-value reference string formatted metric + void send(std::string&& post); + + /// Adds custom HTTP header + void addHeader(const std::string& header); + private: + /// CURL pointers + CURL *mCurl; + + /// HTTP headers struct + struct curl_slist *mHeaders; +}; + +} // namespace transports +} // namespace monitoring +} // namespace o2 + +#endif // ALICEO2_MONITORING_TRANSPORTS_HTTP_H diff --git a/src/UriParser/UriParser.h b/src/UriParser/UriParser.h index 00b199d1f..75fea158b 100644 --- a/src/UriParser/UriParser.h +++ b/src/UriParser/UriParser.h @@ -29,7 +29,7 @@ SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. namespace http { struct url { - std::string protocol, user, password, host, path, search; + std::string protocol, user, password, host, path, search, url; int port; }; @@ -89,7 +89,7 @@ static inline url ParseHttpUrl(std::string& in) { url ret; ret.port = -1; - + ret.url = in; ret.protocol = ExtractProtocol(in); ret.search = ExtractSearch(in); ret.path = ExtractPath(in); diff --git a/test/testInfluxDb.cxx b/test/testInfluxDb.cxx index 6ae8c3e61..d34a43c5d 100644 --- a/test/testInfluxDb.cxx +++ b/test/testInfluxDb.cxx @@ -19,7 +19,6 @@ namespace monitoring { namespace Test { - BOOST_AUTO_TEST_CASE(simplySendMetric) { auto monitoring = MonitoringFactory::Get("influxdb-udp://localhost:1000"); @@ -32,6 +31,11 @@ BOOST_AUTO_TEST_CASE(simplySendMetric2) monitoring->send(Metric{10, "myCrazyMetric"}); } +BOOST_AUTO_TEST_CASE(InfluxDbv2) +{ + auto monitoring = MonitoringFactory::Get("influxdbv2://localhost:9999?org=cern&bucket=test&token=TOKEN"); +} + } // namespace Test } // namespace monitoring } // namespace o2