diff --git a/src/memgraph_bolt.cpp b/src/memgraph_bolt.cpp index a10ef36b6..e77ee28dc 100644 --- a/src/memgraph_bolt.cpp +++ b/src/memgraph_bolt.cpp @@ -167,6 +167,7 @@ int main(int argc, char **argv) { std::set_terminate(&terminate_handler); InitStatsLogging(); + utils::OnScopeExit stop_stats([] { StopStatsLogging(); }); CHECK(!(FLAGS_master && FLAGS_worker)) << "Can't run Memgraph as worker and master at the same time"; diff --git a/src/utils/network.cpp b/src/utils/network.cpp index 4149345f1..ebd198dd1 100644 --- a/src/utils/network.cpp +++ b/src/utils/network.cpp @@ -3,6 +3,8 @@ #include <arpa/inet.h> #include <netdb.h> +#include <climits> +#include <cstdlib> #include <cstring> #include <string> @@ -41,4 +43,12 @@ std::string ResolveHostname(std::string hostname) { return address; } +/// Gets hostname +std::experimental::optional<std::string> GetHostname() { + char hostname[HOST_NAME_MAX + 1]; + int result = gethostname(hostname, sizeof(hostname)); + if (result) return std::experimental::nullopt; + return std::string(hostname); +} + }; // namespace utils diff --git a/src/utils/network.hpp b/src/utils/network.hpp index 7e0333a1b..7a28b4954 100644 --- a/src/utils/network.hpp +++ b/src/utils/network.hpp @@ -1,5 +1,6 @@ #pragma once +#include <experimental/optional> #include <string> namespace utils { @@ -7,4 +8,7 @@ namespace utils { /// Resolves hostname to ip, if already an ip, just returns it std::string ResolveHostname(std::string hostname); +/// Gets hostname +std::experimental::optional<std::string> GetHostname(); + }; // namespace utils diff --git a/tests/macro_benchmark/clients/card_fraud_client.cpp b/tests/macro_benchmark/clients/card_fraud_client.cpp index e67498175..ce2eb5dfd 100644 --- a/tests/macro_benchmark/clients/card_fraud_client.cpp +++ b/tests/macro_benchmark/clients/card_fraud_client.cpp @@ -5,6 +5,17 @@ #include "gflags/gflags.h" #include "long_running_common.hpp" +#include "stats/stats.hpp" + +// TODO(mtomic): this sucks but I don't know a different way to make it work +#include "boost/archive/binary_iarchive.hpp" +#include "boost/archive/binary_oarchive.hpp" +#include "boost/serialization/export.hpp" +BOOST_CLASS_EXPORT(stats::StatsReq); +BOOST_CLASS_EXPORT(stats::StatsRes); +BOOST_CLASS_EXPORT(stats::BatchStatsReq); +BOOST_CLASS_EXPORT(stats::BatchStatsRes); + class CardFraudClient : public TestClient { public: @@ -97,6 +108,8 @@ int main(int argc, char **argv) { gflags::ParseCommandLineFlags(&argc, &argv, true); google::InitGoogleLogging(argv[0]); + InitStatsLogging(); + nlohmann::json config; std::cin >> config; @@ -106,6 +119,8 @@ int main(int argc, char **argv) { CreateIndex(client, "Pos", "id"); CreateIndex(client, "Transaction", "fraud_reported"); + LOG(INFO) << "Done building indexes."; + std::vector<std::unique_ptr<TestClient>> clients; for (int i = 0; i < FLAGS_num_workers; ++i) { clients.emplace_back(std::make_unique<CardFraudClient>(i, num_pos, config)); @@ -113,6 +128,7 @@ int main(int argc, char **argv) { RunMultithreadedTest(clients); - return 0; + StopStatsLogging(); + return 0; } diff --git a/tests/macro_benchmark/clients/long_running_common.hpp b/tests/macro_benchmark/clients/long_running_common.hpp index 35fb51feb..f00f23c15 100644 --- a/tests/macro_benchmark/clients/long_running_common.hpp +++ b/tests/macro_benchmark/clients/long_running_common.hpp @@ -1,8 +1,12 @@ +#pragma once + #include "json/json.hpp" #include "bolt_client.hpp" #include "common.hpp" #include "communication/bolt/v1/decoder/decoded_value.hpp" +#include "stats/stats.hpp" +#include "utils/network.hpp" #include "utils/timer.hpp" const int MAX_RETRIES = 30; @@ -15,6 +19,9 @@ DEFINE_string(username, "", "Username for the database"); DEFINE_string(password, "", "Password for the database"); DEFINE_int32(duration, 30, "Number of seconds to execute benchmark"); +DEFINE_string(group, "unknown", "Test group name"); +DEFINE_string(scenario, "unknown", "Test scenario name"); + class TestClient { public: TestClient() @@ -78,6 +85,13 @@ class TestClient { }; void RunMultithreadedTest(std::vector<std::unique_ptr<TestClient>> &clients) { + static const auto HOSTNAME = + utils::GetHostname().value_or("unknown_hostname"); + static const auto TEST_PREFIX = fmt::format("{}.long_running.{}.{}", HOSTNAME, + FLAGS_group, FLAGS_scenario); + static const auto EXECUTED_QUERIES = + fmt::format("{}.executed_queries", TEST_PREFIX); + CHECK((int)clients.size() == FLAGS_num_workers); // Open stream for writing stats. @@ -120,10 +134,7 @@ void RunMultithreadedTest(std::vector<std::unique_ptr<TestClient>> &clients) { // little bit chaotic. Think about refactoring this part to only use json // and write DecodedValue to json converter. const std::vector<std::string> fields = { - "wall_time", - "parsing_time", - "planning_time", - "plan_execution_time", + "wall_time", "parsing_time", "planning_time", "plan_execution_time", }; for (const auto &query_stats : stats) { std::map<std::string, double> new_aggregated_query_stats; @@ -150,16 +161,17 @@ void RunMultithreadedTest(std::vector<std::unique_ptr<TestClient>> &clients) { } } + LogStat(EXECUTED_QUERIES, executed_queries); + out << "{\"num_executed_queries\": " << executed_queries << ", " << "\"elapsed_time\": " << timer.Elapsed().count() << ", \"queries\": ["; - utils::PrintIterable( - out, aggregated_stats, ", ", [](auto &stream, const auto &x) { - stream << "{\"query\": " << nlohmann::json(x.first) - << ", \"stats\": "; - PrintJsonDecodedValue(stream, DecodedValue(x.second)); - stream << "}"; - }); + utils::PrintIterable(out, aggregated_stats, ", ", [](auto &stream, + const auto &x) { + stream << "{\"query\": " << nlohmann::json(x.first) << ", \"stats\": "; + PrintJsonDecodedValue(stream, DecodedValue(x.second)); + stream << "}"; + }); out << "]}" << std::endl; out.flush(); std::this_thread::sleep_for(1s); diff --git a/tools/src/mg_statsd/main.cpp b/tools/src/mg_statsd/main.cpp index 081be168b..1c8fafec8 100644 --- a/tools/src/mg_statsd/main.cpp +++ b/tools/src/mg_statsd/main.cpp @@ -3,12 +3,15 @@ #include "communication/rpc/server.hpp" #include "io/network/socket.hpp" #include "stats/stats.hpp" +#include "utils/flag_validation.hpp" -DEFINE_string(address, "", "address"); -DEFINE_int32(port, 2500, "port"); +DEFINE_string(interface, "0.0.0.0", + "Communication interface on which to listen."); +DEFINE_VALIDATED_int32(port, 2500, "Communication port on which to listen.", + FLAG_IN_RANGE(0, std::numeric_limits<uint16_t>::max())); -DEFINE_string(graphite_address, "", "Graphite address"); -DEFINE_int32(graphite_port, 0, "port"); +DEFINE_string(graphite_address, "", "Graphite address."); +DEFINE_int32(graphite_port, 0, "Graphite port."); std::string GraphiteFormat(const stats::StatsReq &req) { std::stringstream sstr; @@ -23,7 +26,7 @@ std::string GraphiteFormat(const stats::StatsReq &req) { int main(int argc, char *argv[]) { gflags::ParseCommandLineFlags(&argc, &argv, true); - communication::rpc::System system({FLAGS_address, (uint16_t)FLAGS_port}); + communication::rpc::System system({FLAGS_interface, (uint16_t)FLAGS_port}); communication::rpc::Server server(system, "stats"); io::network::Socket graphite_socket; @@ -40,7 +43,7 @@ int main(int argc, char *argv[]) { }); server.Register<stats::BatchStatsRpc>([&](const stats::BatchStatsReq &req) { - // TODO(mtomic): batching? + // TODO(mtomic): batching? for (size_t i = 0; i < req.requests.size(); ++i) { std::string data = GraphiteFormat(req.requests[i]); graphite_socket.Write(data, i + 1 < req.requests.size());