From 7e7f7ce580081250b62475361f1b43069adab1af Mon Sep 17 00:00:00 2001 From: Marin Tomic Date: Tue, 6 Feb 2018 13:35:05 +0100 Subject: [PATCH] Write to graphite from statsd Reviewers: buda Reviewed By: buda Subscribers: pullbot, mferencevic Differential Revision: https://phabricator.memgraph.io/D1174 --- src/communication/rpc/messages-inl.hpp | 2 ++ src/stats/stats.cpp | 48 +++++++++++++++++++------- src/stats/stats.hpp | 8 +++-- src/stats/stats_rpc_messages.hpp | 20 +++++++++++ tools/src/mg_statsd/main.cpp | 26 ++++++++++++-- 5 files changed, 86 insertions(+), 18 deletions(-) diff --git a/src/communication/rpc/messages-inl.hpp b/src/communication/rpc/messages-inl.hpp index dc2d463b3..b583852f7 100644 --- a/src/communication/rpc/messages-inl.hpp +++ b/src/communication/rpc/messages-inl.hpp @@ -78,3 +78,5 @@ BOOST_CLASS_EXPORT(distributed::IndexLabelPropertyTx); // Stats. BOOST_CLASS_EXPORT(stats::StatsReq); BOOST_CLASS_EXPORT(stats::StatsRes); +BOOST_CLASS_EXPORT(stats::BatchStatsReq); +BOOST_CLASS_EXPORT(stats::BatchStatsRes); diff --git a/src/stats/stats.cpp b/src/stats/stats.cpp index 7475bfdad..0d76a4ddf 100644 --- a/src/stats/stats.cpp +++ b/src/stats/stats.cpp @@ -3,8 +3,10 @@ #include "communication/rpc/client.hpp" #include "stats/stats.hpp" -DEFINE_HIDDEN_string(statsd_address, "", "Stats server IP address."); -DEFINE_HIDDEN_int32(statsd_port, 2500, "Stats server port."); +DEFINE_HIDDEN_string(statsd_address, "", "Stats server IP address"); +DEFINE_HIDDEN_int32(statsd_port, 2500, "Stats server port"); +DEFINE_HIDDEN_int32(statsd_flush_interval, 500, + "Stats flush interval (in milliseconds)"); namespace stats { @@ -19,34 +21,52 @@ void StatsClient::Log(StatsReq req) { } void StatsClient::Start(const io::network::Endpoint &endpoint) { + // TODO(mtomic): we probably want to batch based on request size and MTU + const int MAX_BATCH_SIZE = 100; + dispatch_thread_ = std::thread([this, endpoint]() { CHECK(!is_running_) << "Stats logging already initialized!"; LOG(INFO) << "Stats dispatcher thread started"; communication::rpc::Client client(endpoint, kStatsServiceName); + BatchStatsReq batch_request; + batch_request.requests.reserve(MAX_BATCH_SIZE); + is_running_ = true; while (is_running_) { auto last = queue_.begin(); size_t sent = 0, total = 0; - for (auto it = last; it != queue_.end(); it++) { - // TODO (buda): batch messages - // TODO (buda): set reasonable timeout - if (auto rep = client.Call(*it)) { - ++sent; + + auto flush_batch = [&] { + if (auto rep = client.Call(batch_request)) { + sent += batch_request.requests.size(); + } + total += batch_request.requests.size(); + batch_request.requests.clear(); + }; + + for (auto it = last; it != queue_.end(); it++) { + batch_request.requests.emplace_back(std::move(*it)); + if (batch_request.requests.size() == MAX_BATCH_SIZE) { + flush_batch(); } - ++total; } - LOG(INFO) << fmt::format("Sent {} out of {} events from queue.", sent, - total); + + if (!batch_request.requests.empty()) { + flush_batch(); + } + + VLOG(10) << fmt::format("Sent {} out of {} events from queue.", sent, + total); last.delete_tail(); - // TODO (buda): parametrize sleep time - std::this_thread::sleep_for(std::chrono::microseconds(500)); + std::this_thread::sleep_for( + std::chrono::milliseconds(FLAGS_statsd_flush_interval)); } }); } -StatsClient::~StatsClient() { +void StatsClient::Stop() { if (is_running_) { is_running_ = false; dispatch_thread_.join(); @@ -63,6 +83,8 @@ void InitStatsLogging() { } } +void StopStatsLogging() { stats::client.Stop(); } + void LogStat(const std::string &metric_path, double value, const std::vector> &tags) { stats::client.Log({metric_path, tags, value}); diff --git a/src/stats/stats.hpp b/src/stats/stats.hpp index b1ad29bc4..c50ce0951 100644 --- a/src/stats/stats.hpp +++ b/src/stats/stats.hpp @@ -8,6 +8,10 @@ #include "io/network/endpoint.hpp" #include "stats/stats_rpc_messages.hpp" +DECLARE_string(statsd_address); +DECLARE_int32(statsd_port); +DECLARE_int32(statsd_flush_interval); + namespace stats { // TODO (buda): documentation + tests @@ -24,8 +28,7 @@ class StatsClient { void Log(StatsReq req); void Start(const io::network::Endpoint &endpoint); - - ~StatsClient(); + void Stop(); private: ConcurrentPushQueue queue_; @@ -38,5 +41,6 @@ class StatsClient { // TODO (buda): ON/OFF compile OR runtime parameter? void InitStatsLogging(); +void StopStatsLogging(); void LogStat(const std::string &metric_path, double value, const std::vector> &tags = {}); diff --git a/src/stats/stats_rpc_messages.hpp b/src/stats/stats_rpc_messages.hpp index a7ec22948..88ee578b3 100644 --- a/src/stats/stats_rpc_messages.hpp +++ b/src/stats/stats_rpc_messages.hpp @@ -37,6 +37,26 @@ struct StatsReq : public communication::rpc::Message { RPC_NO_MEMBER_MESSAGE(StatsRes); +struct BatchStatsReq : public communication::rpc::Message { + BatchStatsReq() {} + BatchStatsReq(std::vector requests) : requests(requests) {} + + std::vector requests; + + private: + friend class boost::serialization::access; + + template + void serialize(TArchive &ar, unsigned int) { + ar &boost::serialization::base_object(*this); + ar &requests; + } +}; + +RPC_NO_MEMBER_MESSAGE(BatchStatsRes); + using StatsRpc = communication::rpc::RequestResponse; +using BatchStatsRpc = + communication::rpc::RequestResponse; } // namespace stats diff --git a/tools/src/mg_statsd/main.cpp b/tools/src/mg_statsd/main.cpp index ce5342ae8..081be168b 100644 --- a/tools/src/mg_statsd/main.cpp +++ b/tools/src/mg_statsd/main.cpp @@ -1,11 +1,15 @@ #include "gflags/gflags.h" #include "communication/rpc/server.hpp" +#include "io/network/socket.hpp" #include "stats/stats.hpp" DEFINE_string(address, "", "address"); DEFINE_int32(port, 2500, "port"); +DEFINE_string(graphite_address, "", "Graphite address"); +DEFINE_int32(graphite_port, 0, "port"); + std::string GraphiteFormat(const stats::StatsReq &req) { std::stringstream sstr; sstr << req.metric_path; @@ -22,12 +26,28 @@ int main(int argc, char *argv[]) { communication::rpc::System system({FLAGS_address, (uint16_t)FLAGS_port}); communication::rpc::Server server(system, "stats"); - server.Register([](const stats::StatsReq &req) { - // TODO(mtomic): actually send to graphite - LOG(INFO) << fmt::format("Got message: {}", GraphiteFormat(req)); + io::network::Socket graphite_socket; + + CHECK(graphite_socket.Connect( + {FLAGS_graphite_address, (uint16_t)FLAGS_graphite_port})) + << "Failed to connect to Graphite"; + graphite_socket.SetKeepAlive(); + + server.Register([&](const stats::StatsReq &req) { + std::string data = GraphiteFormat(req); + graphite_socket.Write(data); return std::make_unique(); }); + server.Register([&](const stats::BatchStatsReq &req) { + // TODO(mtomic): batching? + for (size_t i = 0; i < req.requests.size(); ++i) { + std::string data = GraphiteFormat(req.requests[i]); + graphite_socket.Write(data, i + 1 < req.requests.size()); + } + return std::make_unique(); + }); + std::this_thread::sleep_until(std::chrono::system_clock::time_point::max()); return 0;