Fix statsd and benchmark client
Summary: Stats server wasn't connecting to the right service on statsd. Also, benchmark client stats now have prefix `client` instead of machine name to be consistent with memgraphs stats naming which starts with `master` or `worker`. Reviewers: mtomic, buda Reviewed By: buda Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1209
This commit is contained in:
parent
dca77a6676
commit
61967f2e06
@ -14,8 +14,6 @@ DEFINE_HIDDEN_int32(statsd_flush_interval, 500,
|
||||
|
||||
namespace stats {
|
||||
|
||||
const std::string kStatsServiceName = "statsd-service";
|
||||
|
||||
std::string statsd_prefix = "";
|
||||
std::thread stats_dispatch_thread;
|
||||
std::thread counter_refresh_thread;
|
||||
@ -39,7 +37,6 @@ void RefreshMetrics() {
|
||||
LOG(INFO) << "Metrics flush thread stopped";
|
||||
}
|
||||
|
||||
|
||||
void StatsDispatchMain(const io::network::Endpoint &endpoint) {
|
||||
// TODO(mtomic): we probably want to batch based on request size and MTU
|
||||
const int MAX_BATCH_SIZE = 100;
|
||||
|
@ -11,6 +11,8 @@
|
||||
|
||||
namespace stats {
|
||||
|
||||
static const std::string kStatsServiceName = "statsd-service";
|
||||
|
||||
/**
|
||||
* Start sending metrics to StatsD server.
|
||||
*
|
||||
|
@ -108,7 +108,8 @@ int main(int argc, char **argv) {
|
||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||
google::InitGoogleLogging(argv[0]);
|
||||
|
||||
stats::InitStatsLogging();
|
||||
stats::InitStatsLogging(
|
||||
fmt::format("client.long_running.{}.{}", FLAGS_group, FLAGS_scenario));
|
||||
|
||||
nlohmann::json config;
|
||||
std::cin >> config;
|
||||
|
@ -23,10 +23,7 @@ DEFINE_int32(duration, 30, "Number of seconds to execute benchmark");
|
||||
DEFINE_string(group, "unknown", "Test group name");
|
||||
DEFINE_string(scenario, "unknown", "Test scenario name");
|
||||
|
||||
static const auto EXECUTED_QUERIES =
|
||||
fmt::format("{}.{}.executed_queries", FLAGS_group, FLAGS_scenario);
|
||||
|
||||
auto &executed_queries = stats::GetCounter(EXECUTED_QUERIES);
|
||||
auto &executed_queries = stats::GetCounter("executed_queries");
|
||||
|
||||
class TestClient {
|
||||
public:
|
||||
@ -131,10 +128,7 @@ void RunMultithreadedTest(std::vector<std::unique_ptr<TestClient>> &clients) {
|
||||
// little bit chaotic. Think about refactoring this part to only use json
|
||||
// and write DecodedValue to json converter.
|
||||
const std::vector<std::string> fields = {
|
||||
"wall_time",
|
||||
"parsing_time",
|
||||
"planning_time",
|
||||
"plan_execution_time",
|
||||
"wall_time", "parsing_time", "planning_time", "plan_execution_time",
|
||||
};
|
||||
for (const auto &query_stats : stats) {
|
||||
std::map<std::string, double> new_aggregated_query_stats;
|
||||
@ -164,13 +158,12 @@ void RunMultithreadedTest(std::vector<std::unique_ptr<TestClient>> &clients) {
|
||||
out << "{\"num_executed_queries\": " << executed_queries.Value() << ", "
|
||||
<< "\"elapsed_time\": " << timer.Elapsed().count()
|
||||
<< ", \"queries\": [";
|
||||
utils::PrintIterable(
|
||||
out, aggregated_stats, ", ", [](auto &stream, const auto &x) {
|
||||
stream << "{\"query\": " << nlohmann::json(x.first)
|
||||
<< ", \"stats\": ";
|
||||
PrintJsonDecodedValue(stream, DecodedValue(x.second));
|
||||
stream << "}";
|
||||
});
|
||||
utils::PrintIterable(out, aggregated_stats, ", ", [](auto &stream,
|
||||
const auto &x) {
|
||||
stream << "{\"query\": " << nlohmann::json(x.first) << ", \"stats\": ";
|
||||
PrintJsonDecodedValue(stream, DecodedValue(x.second));
|
||||
stream << "}";
|
||||
});
|
||||
out << "]}" << std::endl;
|
||||
out.flush();
|
||||
std::this_thread::sleep_for(1s);
|
||||
|
@ -28,7 +28,7 @@ int main(int argc, char *argv[]) {
|
||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||
|
||||
communication::rpc::System system({FLAGS_interface, (uint16_t)FLAGS_port});
|
||||
communication::rpc::Server server(system, "stats");
|
||||
communication::rpc::Server server(system, stats::kStatsServiceName);
|
||||
|
||||
io::network::Socket graphite_socket;
|
||||
|
||||
@ -38,6 +38,7 @@ int main(int argc, char *argv[]) {
|
||||
graphite_socket.SetKeepAlive();
|
||||
|
||||
server.Register<stats::StatsRpc>([&](const stats::StatsReq &req) {
|
||||
LOG(INFO) << "StatsRpc::Received";
|
||||
std::string data = GraphiteFormat(req);
|
||||
graphite_socket.Write(data);
|
||||
return std::make_unique<stats::StatsRes>();
|
||||
@ -45,6 +46,8 @@ int main(int argc, char *argv[]) {
|
||||
|
||||
server.Register<stats::BatchStatsRpc>([&](const stats::BatchStatsReq &req) {
|
||||
// TODO(mtomic): batching?
|
||||
LOG(INFO) << fmt::format("BatchStatsRpc::Received: {}",
|
||||
req.requests.size());
|
||||
for (size_t i = 0; i < req.requests.size(); ++i) {
|
||||
std::string data = GraphiteFormat(req.requests[i]);
|
||||
graphite_socket.Write(data, i + 1 < req.requests.size());
|
||||
|
Loading…
Reference in New Issue
Block a user