Remove statsd

Reviewers: teon.banek

Reviewed By: teon.banek

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2585
This commit is contained in:
Matej Ferencevic 2019-12-06 14:41:03 +01:00
parent 8a0abd6dbd
commit 8da71873a7
17 changed files with 14 additions and 688 deletions

View File

@ -6,7 +6,6 @@
- build_release/memgraph_ha
- build_release/tools/src/mg_client
- build_release/tools/src/mg_import_csv
- build_release/tools/src/mg_statsd
- config
filename: binaries.tar.gz

View File

@ -7,7 +7,6 @@ add_subdirectory(requests)
add_subdirectory(io)
add_subdirectory(telemetry)
add_subdirectory(communication)
add_subdirectory(stats)
add_subdirectory(auth)
add_subdirectory(slk)
add_subdirectory(storage/v2)

View File

@ -1,13 +0,0 @@
set(stats_src_files
metrics.cpp
stats.cpp)
define_add_lcp(add_lcp stats_src_files stats_lcp_files)
add_lcp(stats_rpc_messages.lcp SLK_SERIALIZE)
add_custom_target(generate_stats_lcp DEPENDS ${stats_lcp_files})
add_library(mg-stats STATIC ${stats_src_files})
target_link_libraries(mg-stats Threads::Threads mg-utils mg-io mg-comm-rpc fmt glog gflags)
add_dependencies(mg-stats generate_stats_lcp)

View File

@ -1,103 +0,0 @@
#include "stats/metrics.hpp"
#include <tuple>
#include "fmt/format.h"
#include "glog/logging.h"
namespace stats {
std::mutex &MetricsMutex() {
static std::mutex mutex;
return mutex;
}
std::map<std::string, std::unique_ptr<Metric>> &AccessMetrics() {
static std::map<std::string, std::unique_ptr<Metric>> metrics;
MetricsMutex().lock();
return metrics;
}
void ReleaseMetrics() { MetricsMutex().unlock(); }
Metric::Metric(int64_t start_value) : value_(start_value) {}
Counter::Counter(int64_t start_value) : Metric(start_value) {}
void Counter::Bump(int64_t delta) { value_ += delta; }
std::optional<int64_t> Counter::Flush() { return value_; }
int64_t Counter::Value() { return value_; }
Gauge::Gauge(int64_t start_value) : Metric(start_value) {}
void Gauge::Set(int64_t value) { value_ = value; }
std::optional<int64_t> Gauge::Flush() { return value_; }
IntervalMin::IntervalMin(int64_t start_value) : Metric(start_value) {}
void IntervalMin::Add(int64_t value) {
int64_t curr = value_;
while (curr > value && !value_.compare_exchange_weak(curr, value))
;
}
std::optional<int64_t> IntervalMin::Flush() {
int64_t curr = value_;
value_.compare_exchange_weak(curr, std::numeric_limits<int64_t>::max());
return curr == std::numeric_limits<int64_t>::max() ? std::nullopt
: std::make_optional(curr);
}
IntervalMax::IntervalMax(int64_t start_value) : Metric(start_value) {}
void IntervalMax::Add(int64_t value) {
int64_t curr = value_;
while (curr < value && !value_.compare_exchange_weak(curr, value))
;
}
std::optional<int64_t> IntervalMax::Flush() {
int64_t curr = value_;
value_.compare_exchange_weak(curr, std::numeric_limits<int64_t>::min());
return curr == std::numeric_limits<int64_t>::min() ? std::nullopt
: std::make_optional(curr);
}
template <class T>
T &GetMetric(const std::string &name, int64_t start_value) {
auto &metrics = AccessMetrics();
auto it = metrics.find(name);
if (it == metrics.end()) {
auto got = metrics.emplace(name, std::make_unique<T>(start_value));
CHECK(got.second) << "Failed to create counter " << name;
it = got.first;
}
ReleaseMetrics();
auto *ptr = dynamic_cast<T *>(it->second.get());
if (!ptr) {
LOG(FATAL) << fmt::format("GetMetric({}) called with invalid metric type",
name);
}
return *ptr;
}
Counter &GetCounter(const std::string &name, int64_t start_value) {
return GetMetric<Counter>(name, start_value);
}
Gauge &GetGauge(const std::string &name, int64_t start_value) {
return GetMetric<Gauge>(name, start_value);
}
IntervalMin &GetIntervalMin(const std::string &name) {
return GetMetric<IntervalMin>(name, std::numeric_limits<int64_t>::max());
}
IntervalMax &GetIntervalMax(const std::string &name) {
return GetMetric<IntervalMax>(name, std::numeric_limits<int64_t>::min());
}
} // namespace stats

View File

@ -1,202 +0,0 @@
/**
* @file
*
* This file contains some metrics types that can be aggregated on client side
* and periodically flushed to StatsD.
*/
#pragma once
#include <atomic>
#include <map>
#include <memory>
#include <mutex>
#include <optional>
#include <string>
#include "fmt/format.h"
namespace stats {
// TODO(mtomic): it would probably be nice to have Value method for every metric
// type, however, there is no use case for this yet
/**
* Abstract base class for all metrics.
*/
class Metric {
public:
/**
* Constructs a metric to be exported to StatsD.
*
* @param name metric will be exported to StatsD with this path
* @param value initial value
*/
virtual ~Metric() {}
/**
* Metric refresh thread will periodically call this function. It should
* return the metric value aggregated since the last flush call or nullopt
* if there were no updates.
*/
virtual std::optional<int64_t> Flush() = 0;
explicit Metric(int64_t start_value = 0);
protected:
std::atomic<int64_t> value_;
};
/**
* A simple counter.
*/
class Counter : public Metric {
public:
explicit Counter(int64_t start_value = 0);
/**
* Change counter value by delta.
*
* @param delta value change
*/
void Bump(int64_t delta = 1);
/** Returns the current value of the counter. **/
std::optional<int64_t> Flush() override;
/** Returns the current value of the counter. **/
int64_t Value();
friend Counter &GetCounter(const std::string &name);
};
/**
* To be used instead of Counter constructor. If counter with this name doesn't
* exist, it will be initialized with start_value.
*
* @param name counter name
* @param start_value start value
*/
Counter &GetCounter(const std::string &name, int64_t start_value = 0);
/**
* A simple gauge. Gauge value is explicitly set, instead of being added to or
* subtracted from.
*/
class Gauge : public Metric {
public:
explicit Gauge(int64_t start_value = 0);
/**
* Set gauge value.
*
* @param value value to be set
*/
void Set(int64_t value);
/** Returns the current gauge value. **/
std::optional<int64_t> Flush() override;
};
/**
* To be used instead of Gauge constructor. If gauge with this name doesn't
* exist, it will be initialized with start_value.
*
* @param name gauge name
* @param start_value start value
*/
Gauge &GetGauge(const std::string &name, int64_t start_value = 0);
/**
* Aggregates minimum between two flush periods.
*/
class IntervalMin : public Metric {
public:
explicit IntervalMin(int64_t start_value);
/**
* Add another value into the minimum computation.
*
* @param value value to be added
*/
void Add(int64_t value);
/**
* Returns the minimum value encountered since the last flush period,
* or nullopt if no values were added.
*/
std::optional<int64_t> Flush() override;
};
/**
* To be used instead of IntervalMin constructor.
*
* @param name interval min name
*/
IntervalMin &GetIntervalMin(const std::string &name);
/**
* Aggregates maximum betweenw two flush periods.
*/
class IntervalMax : public Metric {
public:
explicit IntervalMax(int64_t start_value);
/**
* Add another value into the maximum computation.
*/
void Add(int64_t value);
/**
* Returns the maximum value encountered since the last flush period,
* or nullopt if no values were added.
*/
std::optional<int64_t> Flush() override;
};
/**
* To be used instead of IntervalMax constructor.
*
* @param name interval max name
*/
IntervalMax &GetIntervalMax(const std::string &name);
/**
* A stopwatch utility. It exports 4 metrics: total time measured since the
* beginning of the program, total number of times time intervals measured,
* minimum and maximum time interval measured since the last metric flush.
* Metrics exported by the stopwatch will be named
* [name].{total_time|count|min|max}.
*
* @param name timed event name
* @param f Callable, an action to be performed.
*/
template <class Function>
int64_t Stopwatch(const std::string &name, Function f) {
auto &total_time = GetCounter(fmt::format("{}.total_time", name));
auto &count = GetCounter(fmt::format("{}.count", name));
auto &min = GetIntervalMin(fmt::format("{}.min", name));
auto &max = GetIntervalMax(fmt::format("{}.max", name));
auto start = std::chrono::system_clock::now();
f();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now() - start)
.count();
total_time.Bump(duration);
count.Bump();
min.Add(duration);
max.Add(duration);
return duration;
}
/**
* Access internal metric list. You probably don't want to use this,
* but if you do, make sure to call ReleaseMetrics when you're done.
*/
std::map<std::string, std::unique_ptr<Metric>> &AccessMetrics();
/**
* Releases internal lock on metric list.
*/
void ReleaseMetrics();
} // namespace stats

View File

@ -1,118 +0,0 @@
#include "stats/stats.hpp"
#include "glog/logging.h"
#include "communication/rpc/client.hpp"
#include "data_structures/concurrent/push_queue.hpp"
#include "stats/metrics.hpp"
#include "stats/stats_rpc_messages.hpp"
#include "utils/thread.hpp"
DEFINE_HIDDEN_string(statsd_address, "", "Stats server IP address");
DEFINE_HIDDEN_int32(statsd_port, 2500, "Stats server port");
DEFINE_HIDDEN_int32(statsd_flush_interval, 500,
"Stats flush interval (in milliseconds)");
namespace stats {
std::string statsd_prefix = "";
std::thread stats_dispatch_thread;
std::thread counter_refresh_thread;
std::atomic<bool> stats_running{false};
ConcurrentPushQueue<StatsReq> stats_queue;
void RefreshMetrics() {
LOG(INFO) << "Metrics flush thread started";
utils::ThreadSetName("Stats refresh");
while (stats_running) {
auto &metrics = AccessMetrics();
for (auto &kv : metrics) {
auto value = kv.second->Flush();
if (value) {
LogStat(kv.first, *value);
}
}
ReleaseMetrics();
// TODO(mtomic): hardcoded sleep time
std::this_thread::sleep_for(std::chrono::seconds(1));
}
LOG(INFO) << "Metrics flush thread stopped";
}
void StatsDispatchMain(const io::network::Endpoint &endpoint) {
// TODO(mtomic): we probably want to batch based on request size and MTU
const int MAX_BATCH_SIZE = 100;
LOG(INFO) << "Stats dispatcher thread started";
utils::ThreadSetName("Stats dispatcher");
// TODO(mferencevic): stats are currently hardcoded not to use SSL
communication::ClientContext client_context;
communication::rpc::Client client(endpoint, &client_context);
BatchStatsReq batch_request;
batch_request.requests.reserve(MAX_BATCH_SIZE);
while (stats_running) {
auto last = stats_queue.begin();
size_t sent = 0, total = 0;
auto flush_batch = [&] {
try {
client.Call<BatchStatsRpc>(batch_request);
sent += batch_request.requests.size();
} catch (const communication::rpc::RpcFailedException &) {
DLOG(WARNING) << "BatchStatsRpc failed!";
}
total += batch_request.requests.size();
batch_request.requests.clear();
};
for (auto it = last; it != stats_queue.end(); it++) {
batch_request.requests.emplace_back(std::move(*it));
if (batch_request.requests.size() == MAX_BATCH_SIZE) {
flush_batch();
}
}
if (!batch_request.requests.empty()) {
flush_batch();
}
VLOG(30) << fmt::format("Sent {} out of {} events from queue.", sent,
total);
last.delete_tail();
std::this_thread::sleep_for(
std::chrono::milliseconds(FLAGS_statsd_flush_interval));
}
}
void LogStat(const std::string &metric_path, double value,
const std::vector<std::pair<std::string, std::string>> &tags) {
if (stats_running) {
stats_queue.push(statsd_prefix + metric_path, tags, value);
}
}
void InitStatsLogging(std::string prefix) {
if (!prefix.empty()) {
statsd_prefix = prefix + ".";
}
if (FLAGS_statsd_address != "") {
stats_running = true;
stats_dispatch_thread = std::thread(
StatsDispatchMain, io::network::Endpoint{FLAGS_statsd_address,
(uint16_t)FLAGS_statsd_port});
counter_refresh_thread = std::thread(RefreshMetrics);
}
}
void StopStatsLogging() {
if (stats_running) {
stats_running = false;
stats_dispatch_thread.join();
counter_refresh_thread.join();
}
}
} // namespace stats

View File

@ -1,33 +0,0 @@
/// @file
#pragma once
#include <thread>
#include <vector>
#include "gflags/gflags.h"
#include "io/network/endpoint.hpp"
namespace stats {
/**
* Start sending metrics to StatsD server.
*
* @param prefix prefix to prepend to exported keys
*/
void InitStatsLogging(std::string prefix = "");
/**
* Stop sending metrics to StatsD server. This should be called before exiting
* program.
*/
void StopStatsLogging();
/**
* Send a value to StatsD with current timestamp.
*/
void LogStat(const std::string &metric_path, double value,
const std::vector<std::pair<std::string, std::string>> &tags = {});
} // namespace stats

View File

@ -1,25 +0,0 @@
#>cpp
#pragma once
#include "communication/rpc/messages.hpp"
#include "slk/serialization.hpp"
#include "utils/timestamp.hpp"
cpp<#
(lcp:namespace stats)
(lcp:define-rpc stats
(:request
((metric-path "std::string")
(tags "std::vector<std::pair<std::string, std::string>>")
(value :double)
(timestamp :uint64_t :initarg nil
:initval "static_cast<uint64_t>(utils::Timestamp::Now().SecSinceTheEpoch())")))
(:response ()))
(lcp:define-rpc batch-stats
(:request
((requests "std::vector<StatsReq>")))
(:response ()))
(lcp:pop-namespace) ;; stats

View File

@ -15,16 +15,16 @@ function(add_macro_benchmark test_cpp)
endfunction(add_macro_benchmark)
add_macro_benchmark(clients/pokec_client.cpp)
target_link_libraries(${test_prefix}pokec_client mg-communication mg-io mg-utils mg-stats json)
target_link_libraries(${test_prefix}pokec_client mg-communication mg-io mg-utils json)
add_macro_benchmark(clients/graph_500_bfs.cpp)
target_link_libraries(${test_prefix}graph_500_bfs mg-communication mg-comm-rpc mg-io mg-utils mg-stats json)
target_link_libraries(${test_prefix}graph_500_bfs mg-communication mg-io mg-utils json)
add_macro_benchmark(clients/bfs_pokec_client.cpp)
target_link_libraries(${test_prefix}bfs_pokec_client mg-communication mg-io mg-utils mg-stats json)
target_link_libraries(${test_prefix}bfs_pokec_client mg-communication mg-io mg-utils json)
add_macro_benchmark(clients/query_client.cpp)
target_link_libraries(${test_prefix}query_client mg-communication mg-io mg-utils)
add_macro_benchmark(clients/card_fraud_client.cpp)
target_link_libraries(${test_prefix}card_fraud_client mg-communication mg-comm-rpc mg-io mg-utils mg-stats json)
target_link_libraries(${test_prefix}card_fraud_client mg-communication mg-io mg-utils json)

View File

@ -5,9 +5,6 @@
#include "gflags/gflags.h"
#include "communication/rpc/client.hpp"
#include "stats/stats.hpp"
#include "stats/stats_rpc_messages.hpp"
#include "utils/rw_lock.hpp"
#include "long_running_common.hpp"
@ -23,14 +20,6 @@ DEFINE_string(config, "", "test config");
enum class Role { WORKER, ANALYTIC, CLEANUP };
stats::Gauge &num_vertices = stats::GetGauge("vertices");
stats::Gauge &num_edges = stats::GetGauge("edges");
void UpdateStats() {
num_vertices.Set(num_pos + num_cards + num_transactions);
num_edges.Set(2 * num_transactions);
}
int64_t NumNodesWithLabel(Client &client, std::string label) {
std::string query = fmt::format("MATCH (u :{}) RETURN count(u)", label);
auto result = ExecuteNTimesTillSuccess(client, query, {}, MAX_RETRIES);
@ -176,7 +165,6 @@ class CardFraudClient : public TestClient {
card_id, tx_id, pos_id);
num_transactions++;
UpdateStats();
}
int64_t UniformInt(int64_t a, int64_t b) {
@ -261,7 +249,6 @@ class CardFraudClient : public TestClient {
num_transactions, num_transactions_db, deleted,
num_transactions - num_transactions_db);
num_transactions = num_transactions_db;
UpdateStats();
}
std::this_thread::sleep_for(
@ -334,9 +321,6 @@ int main(int argc, char **argv) {
communication::Init();
stats::InitStatsLogging(
fmt::format("client.long_running.{}.{}", FLAGS_group, FLAGS_scenario));
Endpoint endpoint(FLAGS_address, FLAGS_port);
ClientContext context(FLAGS_use_ssl);
Client client(&context);
@ -382,7 +366,5 @@ int main(int argc, char **argv) {
RunMultithreadedTest(clients);
stats::StopStatsLogging();
return 0;
}

View File

@ -6,8 +6,6 @@
#include "gflags/gflags.h"
#include "long_running_common.hpp"
#include "stats/stats.hpp"
#include "stats/stats_rpc_messages.hpp"
class Graph500BfsClient : public TestClient {
public:
@ -55,7 +53,5 @@ int main(int argc, char **argv) {
RunMultithreadedTest(clients);
stats::StopStatsLogging();
return 0;
}

View File

@ -14,8 +14,6 @@
#include "json/json.hpp"
#include "stats/metrics.hpp"
#include "stats/stats.hpp"
#include "utils/timer.hpp"
#include "common.hpp"
@ -35,9 +33,9 @@ DEFINE_int32(duration, 30, "Number of seconds to execute benchmark");
DEFINE_string(group, "unknown", "Test group name");
DEFINE_string(scenario, "unknown", "Test scenario name");
auto &executed_queries = stats::GetCounter("executed_queries");
auto &executed_steps = stats::GetCounter("executed_steps");
auto &serialization_errors = stats::GetCounter("serialization_errors");
std::atomic<uint64_t> executed_queries{0};
std::atomic<uint64_t> executed_steps{0};
std::atomic<uint64_t> serialization_errors{0};
class TestClient {
public:
@ -59,7 +57,7 @@ class TestClient {
runner_thread_ = std::thread([&] {
while (keep_running_) {
Step();
executed_steps.Bump();
++executed_steps;
}
});
}
@ -82,7 +80,7 @@ class TestClient {
std::tie(result, retries) =
ExecuteNTimesTillSuccess(client_, query, params, MAX_RETRIES);
} catch (const utils::BasicException &e) {
serialization_errors.Bump(MAX_RETRIES);
serialization_errors += MAX_RETRIES;
return std::nullopt;
}
auto wall_time = timer.Elapsed();
@ -96,8 +94,8 @@ class TestClient {
stats_[query].push_back(std::move(metadata));
}
}
executed_queries.Bump();
serialization_errors.Bump(retries);
++executed_queries;
serialization_errors += retries;
return result;
}
@ -177,16 +175,11 @@ void RunMultithreadedTest(std::vector<std::unique_ptr<TestClient>> &clients) {
auto it = aggregated_query_stats.insert({stat.first, Value(0.0)}).first;
it->second = (it->second.ValueDouble() * old_count + stat.second) /
(old_count + new_count);
stats::LogStat(
fmt::format("queries.{}.{}", query_stats.first, stat.first),
(stat.second / new_count));
}
stats::LogStat(fmt::format("queries.{}.count", query_stats.first),
new_count);
}
out << "{\"num_executed_queries\": " << executed_queries.Value() << ", "
<< "\"num_executed_steps\": " << executed_steps.Value() << ", "
out << "{\"num_executed_queries\": " << executed_queries << ", "
<< "\"num_executed_steps\": " << executed_steps << ", "
<< "\"elapsed_time\": " << timer.Elapsed().count()
<< ", \"queries\": [";
utils::PrintIterable(

View File

@ -78,9 +78,6 @@ target_link_libraries(${test_prefix}interpreter mg-single-node-v2 mg-auth kvstor
add_unit_test(kvstore.cpp)
target_link_libraries(${test_prefix}kvstore kvstore_lib glog)
add_unit_test(metrics.cpp)
target_link_libraries(${test_prefix}metrics mg-stats)
add_unit_test(mvcc.cpp)
target_link_libraries(${test_prefix}mvcc mg-single-node kvstore_dummy_lib)

View File

@ -2,10 +2,6 @@
add_executable(mg_import_csv mg_import_csv/main.cpp)
target_link_libraries(mg_import_csv mg-single-node kvstore_dummy_lib)
# StatsD Target
add_executable(mg_statsd mg_statsd/main.cpp)
target_link_libraries(mg_statsd mg-communication mg-io mg-utils mg-stats)
# Generate a version.hpp file
set(VERSION_STRING ${memgraph_VERSION})
configure_file(../../src/version.hpp.in version.hpp @ONLY)
@ -41,4 +37,4 @@ endif()
install(TARGETS mg_client RUNTIME DESTINATION bin)
# Target for building all the tool executables.
add_custom_target(tools DEPENDS mg_import_csv mg_statsd mg_client mg_dump)
add_custom_target(tools DEPENDS mg_import_csv mg_client mg_dump)

View File

@ -1,76 +0,0 @@
#include "gflags/gflags.h"
#include "communication/rpc/server.hpp"
#include "io/network/socket.hpp"
#include "stats/stats.hpp"
#include "stats/stats_rpc_messages.hpp"
#include "utils/flag_validation.hpp"
DEFINE_string(interface, "0.0.0.0",
"Communication interface on which to listen.");
DEFINE_VALIDATED_int32(port, 2500, "Communication port on which to listen.",
FLAG_IN_RANGE(0, std::numeric_limits<uint16_t>::max()));
DEFINE_string(graphite_address, "", "Graphite address.");
DEFINE_int32(graphite_port, 0, "Graphite port.");
DEFINE_string(prefix, "", "Prefix for all collected stats");
std::string GraphiteFormat(const stats::StatsReq &req) {
std::stringstream sstr;
if (!FLAGS_prefix.empty()) {
sstr << FLAGS_prefix << "." << req.metric_path;
} else {
sstr << req.metric_path;
}
for (const auto &tag : req.tags) {
sstr << ";" << tag.first << "=" << tag.second;
}
sstr << " " << req.value << " " << req.timestamp << "\n";
return sstr.str();
}
int main(int argc, char *argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
// TODO(mferencevic): stats are currently hardcoded not to use SSL
communication::ServerContext server_context;
communication::rpc::Server server({FLAGS_interface, (uint16_t)FLAGS_port},
&server_context);
io::network::Socket graphite_socket;
CHECK(graphite_socket.Connect(
{FLAGS_graphite_address, (uint16_t)FLAGS_graphite_port}))
<< "Failed to connect to Graphite";
graphite_socket.SetKeepAlive();
server.Register<stats::StatsRpc>(
[&](const auto &req_reader, auto *res_builder) {
stats::StatsReq req;
Load(&req, req_reader);
LOG(INFO) << "StatsRpc::Received";
std::string data = GraphiteFormat(req);
graphite_socket.Write(data);
stats::StatsRes res;
Save(res, res_builder);
});
server.Register<stats::BatchStatsRpc>(
[&](const auto &req_reader, auto *res_builder) {
// TODO(mtomic): batching?
stats::BatchStatsReq req;
Load(&req, req_reader);
LOG(INFO) << fmt::format("BatchStatsRpc::Received: {}",
req.requests.size());
for (size_t i = 0; i < req.requests.size(); ++i) {
std::string data = GraphiteFormat(req.requests[i]);
graphite_socket.Write(data, i + 1 < req.requests.size());
}
stats::BatchStatsRes res;
Save(res, res_builder);
});
std::this_thread::sleep_until(std::chrono::system_clock::time_point::max());
return 0;
}

View File

@ -3,9 +3,6 @@ include_directories(SYSTEM ${GTEST_INCLUDE_DIR})
add_executable(mg_recovery_check mg_recovery_check.cpp)
target_link_libraries(mg_recovery_check mg-single-node gtest gtest_main kvstore_dummy_lib)
add_executable(mg_statsd_client statsd/mg_statsd_client.cpp)
target_link_libraries(mg_statsd_client mg-communication mg-io mg-utils mg-stats)
# Copy CSV data to CMake build dir
configure_file(csv/comment_nodes.csv csv/comment_nodes.csv COPYONLY)
configure_file(csv/comment_nodes_2.csv csv/comment_nodes_2.csv COPYONLY)

View File

@ -1,63 +0,0 @@
#include "gflags/gflags.h"
#include "glog/logging.h"
#include "communication/rpc/client.hpp"
#include "stats/stats.hpp"
#include "stats/stats_rpc_messages.hpp"
#include "utils/string.hpp"
// TODO (buda): Move this logic to a unit test.
bool parse_input(const std::string &s, std::string &metric_path,
std::vector<std::pair<std::string, std::string>> &tags,
double &value) {
auto words = utils::Split(s, " ");
if (words.size() < 2) {
return false;
}
metric_path = words[0];
try {
value = std::stod(words.back());
} catch (std::exception &e) {
return false;
}
tags.clear();
for (size_t i = 1; i < words.size() - 1; ++i) {
auto tag_value = utils::Split(words[i], "=", 1);
if (tag_value.size() != 2) {
return false;
}
// TODO(mtomic): tags probably need to be escaped before sending to graphite
tags.emplace_back(tag_value[0], tag_value[1]);
}
return true;
}
int main(int argc, char *argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
LOG(INFO) << "Usage: metric_path tag1=value1 ... tagn=valuen "
"metric_value";
stats::InitStatsLogging();
std::string line;
std::string metric_path;
std::vector<std::pair<std::string, std::string>> tags;
double value;
while (true) {
std::getline(std::cin, line);
if (!parse_input(line, metric_path, tags, value)) {
LOG(ERROR) << "Invalid input";
continue;
}
stats::LogStat(metric_path, value, tags);
}
return 0;
}