diff --git a/tests/mgbench/client.cpp b/tests/mgbench/client.cpp index 87bdda6c9..c7e002df6 100644 --- a/tests/mgbench/client.cpp +++ b/tests/mgbench/client.cpp @@ -1,4 +1,4 @@ -// Copyright 2022 Memgraph Ltd. +// Copyright 2023 Memgraph Ltd. // // Use of this software is governed by the Business Source License // included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source @@ -16,17 +16,23 @@ #include #include #include +#include +#include #include #include #include #include +#include #include #include "communication/bolt/client.hpp" #include "communication/bolt/v1/value.hpp" #include "communication/init.hpp" +#include "spdlog/formatter.h" +#include "spdlog/spdlog.h" #include "utils/exceptions.hpp" +#include "utils/logging.hpp" #include "utils/string.hpp" #include "utils/timer.hpp" @@ -48,6 +54,10 @@ DEFINE_bool(queries_json, false, DEFINE_string(input, "", "Input file. By default stdin is used."); DEFINE_string(output, "", "Output file. By default stdout is used."); +DEFINE_bool(validation, false, + "Set to true to run client in validation mode." + "Validation mode works for singe query and returns results for validation" + "with metadata"); std::pair, uint64_t> ExecuteNTimesTillSuccess( memgraph::communication::bolt::Client *client, const std::string &query, @@ -55,6 +65,7 @@ std::pair, uint64_t> for (uint64_t i = 0; i < max_attempts; ++i) { try { auto ret = client->Execute(query, params); + return {std::move(ret.metadata), i}; } catch (const memgraph::utils::BasicException &e) { if (i == max_attempts - 1) { @@ -67,6 +78,28 @@ std::pair, uint64_t> LOG_FATAL("Could not execute query '{}' {} times!", query, max_attempts); } +// Validation returns results and metadata +std::pair, + std::vector>> +ExecuteValidationNTimesTillSuccess(memgraph::communication::bolt::Client *client, const std::string &query, + const std::map ¶ms, + int max_attempts) { + for (uint64_t i = 0; i < max_attempts; ++i) { + try { + auto ret = client->Execute(query, params); + + return {std::move(ret.metadata), std::move(ret.records)}; + } catch (const memgraph::utils::BasicException &e) { + if (i == max_attempts - 1) { + LOG_FATAL("Could not execute query '{}' {} times! Error message: {}", query, max_attempts, e.what()); + } else { + continue; + } + } + } + LOG_FATAL("Could not execute query '{}' {} times!", query, max_attempts); +} + memgraph::communication::bolt::Value JsonToBoltValue(const nlohmann::json &data) { switch (data.type()) { case nlohmann::json::value_t::null: @@ -158,6 +191,35 @@ class Metadata final { std::map storage_; }; +nlohmann::json LatencyStatistics(std::vector> &worker_query_latency) { + nlohmann::json statistics = nlohmann::json::object(); + std::vector query_latency; + for (int i = 0; i < FLAGS_num_workers; i++) { + for (auto &e : worker_query_latency[i]) { + query_latency.push_back(e); + } + } + auto iterations = query_latency.size(); + const int lower_bound = 10; + if (iterations > lower_bound) { + std::sort(query_latency.begin(), query_latency.end()); + statistics["iterations"] = iterations; + statistics["min"] = query_latency.front(); + statistics["max"] = query_latency.back(); + statistics["mean"] = std::accumulate(query_latency.begin(), query_latency.end(), 0.0) / iterations; + statistics["p99"] = query_latency[floor(iterations * 0.99)]; + statistics["p95"] = query_latency[floor(iterations * 0.95)]; + statistics["p90"] = query_latency[floor(iterations * 0.90)]; + statistics["p75"] = query_latency[floor(iterations * 0.75)]; + statistics["p50"] = query_latency[floor(iterations * 0.50)]; + + } else { + spdlog::info("To few iterations to calculate latency values!"); + statistics["iterations"] = iterations; + } + return statistics; +} + void Execute( const std::vector>> &queries, std::ostream *stream) { @@ -167,6 +229,7 @@ void Execute( std::vector worker_retries(FLAGS_num_workers, 0); std::vector worker_metadata(FLAGS_num_workers, Metadata()); std::vector worker_duration(FLAGS_num_workers, 0.0); + std::vector> worker_query_durations(FLAGS_num_workers); // Start workers and execute queries. auto size = queries.size(); @@ -187,16 +250,20 @@ void Execute( auto &retries = worker_retries[worker]; auto &metadata = worker_metadata[worker]; auto &duration = worker_duration[worker]; - memgraph::utils::Timer timer; + auto &query_duration = worker_query_durations[worker]; + + memgraph::utils::Timer worker_timer; while (true) { auto pos = position.fetch_add(1, std::memory_order_acq_rel); if (pos >= size) break; const auto &query = queries[pos]; + memgraph::utils::Timer query_timer; auto ret = ExecuteNTimesTillSuccess(&client, query.first, query.second, FLAGS_max_retries); + query_duration.push_back(query_timer.Elapsed().count()); retries += ret.second; metadata.Append(ret.first); } - duration = timer.Elapsed().count(); + duration = worker_timer.Elapsed().count(); client.Close(); })); } @@ -218,6 +285,7 @@ void Execute( final_retries += worker_retries[i]; final_duration += worker_duration[i]; } + final_duration /= FLAGS_num_workers; nlohmann::json summary = nlohmann::json::object(); summary["count"] = queries.size(); @@ -226,12 +294,76 @@ void Execute( summary["retries"] = final_retries; summary["metadata"] = final_metadata.Export(); summary["num_workers"] = FLAGS_num_workers; + summary["latency_stats"] = LatencyStatistics(worker_query_durations); + (*stream) << summary.dump() << std::endl; +} + +nlohmann::json BoltRecordsToJSONStrings(std::vector> &results) { + nlohmann::json res = nlohmann::json::object(); + std::ostringstream oss; + for (int i = 0; i < results.size(); i++) { + oss << results[i]; + res[std::to_string(i)] = oss.str(); + } + return res; +} + +/// Validation mode works on single thread with 1 query. +void ExecuteValidation( + const std::vector>> &queries, + std::ostream *stream) { + spdlog::info("Running validation mode, number of workers forced to 1"); + FLAGS_num_workers = 1; + + Metadata metadata = Metadata(); + double duration = 0.0; + std::vector> results; + + auto size = queries.size(); + + memgraph::io::network::Endpoint endpoint(FLAGS_address, FLAGS_port); + memgraph::communication::ClientContext context(FLAGS_use_ssl); + memgraph::communication::bolt::Client client(context); + client.Connect(endpoint, FLAGS_username, FLAGS_password); + + memgraph::utils::Timer timer; + if (size == 1) { + const auto &query = queries[0]; + auto ret = ExecuteValidationNTimesTillSuccess(&client, query.first, query.second, FLAGS_max_retries); + metadata.Append(ret.first); + results = ret.second; + duration = timer.Elapsed().count(); + client.Close(); + } else { + spdlog::info("Validation works with single query, pass just one query!"); + } + + nlohmann::json summary = nlohmann::json::object(); + summary["count"] = 1; + summary["duration"] = duration; + summary["metadata"] = metadata.Export(); + summary["results"] = BoltRecordsToJSONStrings(results); + summary["num_workers"] = FLAGS_num_workers; + (*stream) << summary.dump() << std::endl; } int main(int argc, char **argv) { gflags::ParseCommandLineFlags(&argc, &argv, true); + spdlog::info("Running a bolt client with following settings:"); + spdlog::info("Adress: {} ", FLAGS_address); + spdlog::info("Port: {} ", FLAGS_port); + spdlog::info("Username: {} ", FLAGS_username); + spdlog::info("Password: {} ", FLAGS_password); + spdlog::info("Usessl: {} ", FLAGS_use_ssl); + spdlog::info("Num of worker: {}", FLAGS_num_workers); + spdlog::info("Max retries: {}", FLAGS_max_retries); + spdlog::info("Query JSON: {}", FLAGS_queries_json); + spdlog::info("Input: {}", FLAGS_input); + spdlog::info("Output: {}", FLAGS_output); + spdlog::info("Validation: {}", FLAGS_validation); + memgraph::communication::SSLInit sslInit; std::ifstream ifile; @@ -291,7 +423,12 @@ int main(int argc, char **argv) { queries.emplace_back(query, std::move(bolt_param.ValueMap())); } } - Execute(queries, ostream); + + if (!FLAGS_validation) { + Execute(queries, ostream); + } else { + ExecuteValidation(queries, ostream); + } return 0; } diff --git a/tests/mgbench/runners.py b/tests/mgbench/runners.py index 5eb2f84b3..3d3aa966e 100644 --- a/tests/mgbench/runners.py +++ b/tests/mgbench/runners.py @@ -398,7 +398,13 @@ class Client: password=self._password, port=self._bolt_port, ) + ret = subprocess.run(args, capture_output=True, check=True) + error = ret.stderr.decode("utf-8").strip().split("\n") + if error and error[0] != "": + print("Reported errros from client") + print(error) + data = ret.stdout.decode("utf-8").strip().split("\n") - # data = [x for x in data if not x.startswith("[")] + data = [x for x in data if not x.startswith("[")] return list(map(json.loads, data))