Stats from card fraud benchmark

Reviewers: mtomic

Reviewed By: mtomic

Subscribers: mferencevic, pullbot

Differential Revision: https://phabricator.memgraph.io/D1177
This commit is contained in:
Marko Budiselic 2018-02-07 17:52:01 +01:00
parent 81e2e8f64f
commit e2f5bcf96d
6 changed files with 64 additions and 18 deletions

View File

@ -167,6 +167,7 @@ int main(int argc, char **argv) {
std::set_terminate(&terminate_handler);
InitStatsLogging();
utils::OnScopeExit stop_stats([] { StopStatsLogging(); });
CHECK(!(FLAGS_master && FLAGS_worker))
<< "Can't run Memgraph as worker and master at the same time";

View File

@ -3,6 +3,8 @@
#include <arpa/inet.h>
#include <netdb.h>
#include <climits>
#include <cstdlib>
#include <cstring>
#include <string>
@ -41,4 +43,12 @@ std::string ResolveHostname(std::string hostname) {
return address;
}
/// Gets hostname
std::experimental::optional<std::string> GetHostname() {
char hostname[HOST_NAME_MAX + 1];
int result = gethostname(hostname, sizeof(hostname));
if (result) return std::experimental::nullopt;
return std::string(hostname);
}
}; // namespace utils

View File

@ -1,5 +1,6 @@
#pragma once
#include <experimental/optional>
#include <string>
namespace utils {
@ -7,4 +8,7 @@ namespace utils {
/// Resolves hostname to ip, if already an ip, just returns it
std::string ResolveHostname(std::string hostname);
/// Gets hostname
std::experimental::optional<std::string> GetHostname();
}; // namespace utils

View File

@ -5,6 +5,17 @@
#include "gflags/gflags.h"
#include "long_running_common.hpp"
#include "stats/stats.hpp"
// TODO(mtomic): this sucks but I don't know a different way to make it work
#include "boost/archive/binary_iarchive.hpp"
#include "boost/archive/binary_oarchive.hpp"
#include "boost/serialization/export.hpp"
BOOST_CLASS_EXPORT(stats::StatsReq);
BOOST_CLASS_EXPORT(stats::StatsRes);
BOOST_CLASS_EXPORT(stats::BatchStatsReq);
BOOST_CLASS_EXPORT(stats::BatchStatsRes);
class CardFraudClient : public TestClient {
public:
@ -97,6 +108,8 @@ int main(int argc, char **argv) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
google::InitGoogleLogging(argv[0]);
InitStatsLogging();
nlohmann::json config;
std::cin >> config;
@ -106,6 +119,8 @@ int main(int argc, char **argv) {
CreateIndex(client, "Pos", "id");
CreateIndex(client, "Transaction", "fraud_reported");
LOG(INFO) << "Done building indexes.";
std::vector<std::unique_ptr<TestClient>> clients;
for (int i = 0; i < FLAGS_num_workers; ++i) {
clients.emplace_back(std::make_unique<CardFraudClient>(i, num_pos, config));
@ -113,6 +128,7 @@ int main(int argc, char **argv) {
RunMultithreadedTest(clients);
return 0;
StopStatsLogging();
return 0;
}

View File

@ -1,8 +1,12 @@
#pragma once
#include "json/json.hpp"
#include "bolt_client.hpp"
#include "common.hpp"
#include "communication/bolt/v1/decoder/decoded_value.hpp"
#include "stats/stats.hpp"
#include "utils/network.hpp"
#include "utils/timer.hpp"
const int MAX_RETRIES = 30;
@ -15,6 +19,9 @@ DEFINE_string(username, "", "Username for the database");
DEFINE_string(password, "", "Password for the database");
DEFINE_int32(duration, 30, "Number of seconds to execute benchmark");
DEFINE_string(group, "unknown", "Test group name");
DEFINE_string(scenario, "unknown", "Test scenario name");
class TestClient {
public:
TestClient()
@ -78,6 +85,13 @@ class TestClient {
};
void RunMultithreadedTest(std::vector<std::unique_ptr<TestClient>> &clients) {
static const auto HOSTNAME =
utils::GetHostname().value_or("unknown_hostname");
static const auto TEST_PREFIX = fmt::format("{}.long_running.{}.{}", HOSTNAME,
FLAGS_group, FLAGS_scenario);
static const auto EXECUTED_QUERIES =
fmt::format("{}.executed_queries", TEST_PREFIX);
CHECK((int)clients.size() == FLAGS_num_workers);
// Open stream for writing stats.
@ -120,10 +134,7 @@ void RunMultithreadedTest(std::vector<std::unique_ptr<TestClient>> &clients) {
// little bit chaotic. Think about refactoring this part to only use json
// and write DecodedValue to json converter.
const std::vector<std::string> fields = {
"wall_time",
"parsing_time",
"planning_time",
"plan_execution_time",
"wall_time", "parsing_time", "planning_time", "plan_execution_time",
};
for (const auto &query_stats : stats) {
std::map<std::string, double> new_aggregated_query_stats;
@ -150,16 +161,17 @@ void RunMultithreadedTest(std::vector<std::unique_ptr<TestClient>> &clients) {
}
}
LogStat(EXECUTED_QUERIES, executed_queries);
out << "{\"num_executed_queries\": " << executed_queries << ", "
<< "\"elapsed_time\": " << timer.Elapsed().count()
<< ", \"queries\": [";
utils::PrintIterable(
out, aggregated_stats, ", ", [](auto &stream, const auto &x) {
stream << "{\"query\": " << nlohmann::json(x.first)
<< ", \"stats\": ";
PrintJsonDecodedValue(stream, DecodedValue(x.second));
stream << "}";
});
utils::PrintIterable(out, aggregated_stats, ", ", [](auto &stream,
const auto &x) {
stream << "{\"query\": " << nlohmann::json(x.first) << ", \"stats\": ";
PrintJsonDecodedValue(stream, DecodedValue(x.second));
stream << "}";
});
out << "]}" << std::endl;
out.flush();
std::this_thread::sleep_for(1s);

View File

@ -3,12 +3,15 @@
#include "communication/rpc/server.hpp"
#include "io/network/socket.hpp"
#include "stats/stats.hpp"
#include "utils/flag_validation.hpp"
DEFINE_string(address, "", "address");
DEFINE_int32(port, 2500, "port");
DEFINE_string(interface, "0.0.0.0",
"Communication interface on which to listen.");
DEFINE_VALIDATED_int32(port, 2500, "Communication port on which to listen.",
FLAG_IN_RANGE(0, std::numeric_limits<uint16_t>::max()));
DEFINE_string(graphite_address, "", "Graphite address");
DEFINE_int32(graphite_port, 0, "port");
DEFINE_string(graphite_address, "", "Graphite address.");
DEFINE_int32(graphite_port, 0, "Graphite port.");
std::string GraphiteFormat(const stats::StatsReq &req) {
std::stringstream sstr;
@ -23,7 +26,7 @@ std::string GraphiteFormat(const stats::StatsReq &req) {
int main(int argc, char *argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
communication::rpc::System system({FLAGS_address, (uint16_t)FLAGS_port});
communication::rpc::System system({FLAGS_interface, (uint16_t)FLAGS_port});
communication::rpc::Server server(system, "stats");
io::network::Socket graphite_socket;
@ -40,7 +43,7 @@ int main(int argc, char *argv[]) {
});
server.Register<stats::BatchStatsRpc>([&](const stats::BatchStatsReq &req) {
// TODO(mtomic): batching?
// TODO(mtomic): batching?
for (size_t i = 0; i < req.requests.size(); ++i) {
std::string data = GraphiteFormat(req.requests[i]);
graphite_socket.Write(data, i + 1 < req.requests.size());