Write to graphite from statsd

Reviewers: buda

Reviewed By: buda

Subscribers: pullbot, mferencevic

Differential Revision: https://phabricator.memgraph.io/D1174
This commit is contained in:
Marin Tomic 2018-02-06 13:35:05 +01:00
parent a66351c3f4
commit 7e7f7ce580
5 changed files with 86 additions and 18 deletions

View File

@ -78,3 +78,5 @@ BOOST_CLASS_EXPORT(distributed::IndexLabelPropertyTx);
// Stats.
BOOST_CLASS_EXPORT(stats::StatsReq);
BOOST_CLASS_EXPORT(stats::StatsRes);
BOOST_CLASS_EXPORT(stats::BatchStatsReq);
BOOST_CLASS_EXPORT(stats::BatchStatsRes);

View File

@ -3,8 +3,10 @@
#include "communication/rpc/client.hpp"
#include "stats/stats.hpp"
DEFINE_HIDDEN_string(statsd_address, "", "Stats server IP address.");
DEFINE_HIDDEN_int32(statsd_port, 2500, "Stats server port.");
DEFINE_HIDDEN_string(statsd_address, "", "Stats server IP address");
DEFINE_HIDDEN_int32(statsd_port, 2500, "Stats server port");
DEFINE_HIDDEN_int32(statsd_flush_interval, 500,
"Stats flush interval (in milliseconds)");
namespace stats {
@ -19,34 +21,52 @@ void StatsClient::Log(StatsReq req) {
}
void StatsClient::Start(const io::network::Endpoint &endpoint) {
// TODO(mtomic): we probably want to batch based on request size and MTU
const int MAX_BATCH_SIZE = 100;
dispatch_thread_ = std::thread([this, endpoint]() {
CHECK(!is_running_) << "Stats logging already initialized!";
LOG(INFO) << "Stats dispatcher thread started";
communication::rpc::Client client(endpoint, kStatsServiceName);
BatchStatsReq batch_request;
batch_request.requests.reserve(MAX_BATCH_SIZE);
is_running_ = true;
while (is_running_) {
auto last = queue_.begin();
size_t sent = 0, total = 0;
for (auto it = last; it != queue_.end(); it++) {
// TODO (buda): batch messages
// TODO (buda): set reasonable timeout
if (auto rep = client.Call<StatsRpc>(*it)) {
++sent;
auto flush_batch = [&] {
if (auto rep = client.Call<BatchStatsRpc>(batch_request)) {
sent += batch_request.requests.size();
}
total += batch_request.requests.size();
batch_request.requests.clear();
};
for (auto it = last; it != queue_.end(); it++) {
batch_request.requests.emplace_back(std::move(*it));
if (batch_request.requests.size() == MAX_BATCH_SIZE) {
flush_batch();
}
++total;
}
LOG(INFO) << fmt::format("Sent {} out of {} events from queue.", sent,
total);
if (!batch_request.requests.empty()) {
flush_batch();
}
VLOG(10) << fmt::format("Sent {} out of {} events from queue.", sent,
total);
last.delete_tail();
// TODO (buda): parametrize sleep time
std::this_thread::sleep_for(std::chrono::microseconds(500));
std::this_thread::sleep_for(
std::chrono::milliseconds(FLAGS_statsd_flush_interval));
}
});
}
StatsClient::~StatsClient() {
void StatsClient::Stop() {
if (is_running_) {
is_running_ = false;
dispatch_thread_.join();
@ -63,6 +83,8 @@ void InitStatsLogging() {
}
}
void StopStatsLogging() { stats::client.Stop(); }
void LogStat(const std::string &metric_path, double value,
const std::vector<std::pair<std::string, std::string>> &tags) {
stats::client.Log({metric_path, tags, value});

View File

@ -8,6 +8,10 @@
#include "io/network/endpoint.hpp"
#include "stats/stats_rpc_messages.hpp"
DECLARE_string(statsd_address);
DECLARE_int32(statsd_port);
DECLARE_int32(statsd_flush_interval);
namespace stats {
// TODO (buda): documentation + tests
@ -24,8 +28,7 @@ class StatsClient {
void Log(StatsReq req);
void Start(const io::network::Endpoint &endpoint);
~StatsClient();
void Stop();
private:
ConcurrentPushQueue<StatsReq> queue_;
@ -38,5 +41,6 @@ class StatsClient {
// TODO (buda): ON/OFF compile OR runtime parameter?
void InitStatsLogging();
void StopStatsLogging();
void LogStat(const std::string &metric_path, double value,
const std::vector<std::pair<std::string, std::string>> &tags = {});

View File

@ -37,6 +37,26 @@ struct StatsReq : public communication::rpc::Message {
RPC_NO_MEMBER_MESSAGE(StatsRes);
struct BatchStatsReq : public communication::rpc::Message {
BatchStatsReq() {}
BatchStatsReq(std::vector<StatsReq> requests) : requests(requests) {}
std::vector<StatsReq> requests;
private:
friend class boost::serialization::access;
template <class TArchive>
void serialize(TArchive &ar, unsigned int) {
ar &boost::serialization::base_object<communication::rpc::Message>(*this);
ar &requests;
}
};
RPC_NO_MEMBER_MESSAGE(BatchStatsRes);
using StatsRpc = communication::rpc::RequestResponse<StatsReq, StatsRes>;
using BatchStatsRpc =
communication::rpc::RequestResponse<BatchStatsReq, BatchStatsRes>;
} // namespace stats

View File

@ -1,11 +1,15 @@
#include "gflags/gflags.h"
#include "communication/rpc/server.hpp"
#include "io/network/socket.hpp"
#include "stats/stats.hpp"
DEFINE_string(address, "", "address");
DEFINE_int32(port, 2500, "port");
DEFINE_string(graphite_address, "", "Graphite address");
DEFINE_int32(graphite_port, 0, "port");
std::string GraphiteFormat(const stats::StatsReq &req) {
std::stringstream sstr;
sstr << req.metric_path;
@ -22,12 +26,28 @@ int main(int argc, char *argv[]) {
communication::rpc::System system({FLAGS_address, (uint16_t)FLAGS_port});
communication::rpc::Server server(system, "stats");
server.Register<stats::StatsRpc>([](const stats::StatsReq &req) {
// TODO(mtomic): actually send to graphite
LOG(INFO) << fmt::format("Got message: {}", GraphiteFormat(req));
io::network::Socket graphite_socket;
CHECK(graphite_socket.Connect(
{FLAGS_graphite_address, (uint16_t)FLAGS_graphite_port}))
<< "Failed to connect to Graphite";
graphite_socket.SetKeepAlive();
server.Register<stats::StatsRpc>([&](const stats::StatsReq &req) {
std::string data = GraphiteFormat(req);
graphite_socket.Write(data);
return std::make_unique<stats::StatsRes>();
});
server.Register<stats::BatchStatsRpc>([&](const stats::BatchStatsReq &req) {
// TODO(mtomic): batching?
for (size_t i = 0; i < req.requests.size(); ++i) {
std::string data = GraphiteFormat(req.requests[i]);
graphite_socket.Write(data, i + 1 < req.requests.size());
}
return std::make_unique<stats::BatchStatsRes>();
});
std::this_thread::sleep_until(std::chrono::system_clock::time_point::max());
return 0;