Improve mgbench C++ client (#760)
This commit is contained in:
parent
862a1afdf1
commit
5e2ee6c817
@ -1,4 +1,4 @@
|
||||
// Copyright 2022 Memgraph Ltd.
|
||||
// Copyright 2023 Memgraph Ltd.
|
||||
//
|
||||
// Use of this software is governed by the Business Source License
|
||||
// included in the file licenses/BSL.txt; by using this file, you agree to be bound by the terms of the Business Source
|
||||
@ -16,17 +16,23 @@
|
||||
#include <fstream>
|
||||
#include <limits>
|
||||
#include <map>
|
||||
#include <numeric>
|
||||
#include <ostream>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
#include <gflags/gflags.h>
|
||||
#include <math.h>
|
||||
#include <json/json.hpp>
|
||||
|
||||
#include "communication/bolt/client.hpp"
|
||||
#include "communication/bolt/v1/value.hpp"
|
||||
#include "communication/init.hpp"
|
||||
#include "spdlog/formatter.h"
|
||||
#include "spdlog/spdlog.h"
|
||||
#include "utils/exceptions.hpp"
|
||||
#include "utils/logging.hpp"
|
||||
#include "utils/string.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
@ -48,6 +54,10 @@ DEFINE_bool(queries_json, false,
|
||||
|
||||
DEFINE_string(input, "", "Input file. By default stdin is used.");
|
||||
DEFINE_string(output, "", "Output file. By default stdout is used.");
|
||||
DEFINE_bool(validation, false,
|
||||
"Set to true to run client in validation mode."
|
||||
"Validation mode works for singe query and returns results for validation"
|
||||
"with metadata");
|
||||
|
||||
std::pair<std::map<std::string, memgraph::communication::bolt::Value>, uint64_t> ExecuteNTimesTillSuccess(
|
||||
memgraph::communication::bolt::Client *client, const std::string &query,
|
||||
@ -55,6 +65,7 @@ std::pair<std::map<std::string, memgraph::communication::bolt::Value>, uint64_t>
|
||||
for (uint64_t i = 0; i < max_attempts; ++i) {
|
||||
try {
|
||||
auto ret = client->Execute(query, params);
|
||||
|
||||
return {std::move(ret.metadata), i};
|
||||
} catch (const memgraph::utils::BasicException &e) {
|
||||
if (i == max_attempts - 1) {
|
||||
@ -67,6 +78,28 @@ std::pair<std::map<std::string, memgraph::communication::bolt::Value>, uint64_t>
|
||||
LOG_FATAL("Could not execute query '{}' {} times!", query, max_attempts);
|
||||
}
|
||||
|
||||
// Validation returns results and metadata
|
||||
std::pair<std::map<std::string, memgraph::communication::bolt::Value>,
|
||||
std::vector<std::vector<memgraph::communication::bolt::Value>>>
|
||||
ExecuteValidationNTimesTillSuccess(memgraph::communication::bolt::Client *client, const std::string &query,
|
||||
const std::map<std::string, memgraph::communication::bolt::Value> ¶ms,
|
||||
int max_attempts) {
|
||||
for (uint64_t i = 0; i < max_attempts; ++i) {
|
||||
try {
|
||||
auto ret = client->Execute(query, params);
|
||||
|
||||
return {std::move(ret.metadata), std::move(ret.records)};
|
||||
} catch (const memgraph::utils::BasicException &e) {
|
||||
if (i == max_attempts - 1) {
|
||||
LOG_FATAL("Could not execute query '{}' {} times! Error message: {}", query, max_attempts, e.what());
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
LOG_FATAL("Could not execute query '{}' {} times!", query, max_attempts);
|
||||
}
|
||||
|
||||
memgraph::communication::bolt::Value JsonToBoltValue(const nlohmann::json &data) {
|
||||
switch (data.type()) {
|
||||
case nlohmann::json::value_t::null:
|
||||
@ -158,6 +191,35 @@ class Metadata final {
|
||||
std::map<std::string, Record> storage_;
|
||||
};
|
||||
|
||||
nlohmann::json LatencyStatistics(std::vector<std::vector<double>> &worker_query_latency) {
|
||||
nlohmann::json statistics = nlohmann::json::object();
|
||||
std::vector<double> query_latency;
|
||||
for (int i = 0; i < FLAGS_num_workers; i++) {
|
||||
for (auto &e : worker_query_latency[i]) {
|
||||
query_latency.push_back(e);
|
||||
}
|
||||
}
|
||||
auto iterations = query_latency.size();
|
||||
const int lower_bound = 10;
|
||||
if (iterations > lower_bound) {
|
||||
std::sort(query_latency.begin(), query_latency.end());
|
||||
statistics["iterations"] = iterations;
|
||||
statistics["min"] = query_latency.front();
|
||||
statistics["max"] = query_latency.back();
|
||||
statistics["mean"] = std::accumulate(query_latency.begin(), query_latency.end(), 0.0) / iterations;
|
||||
statistics["p99"] = query_latency[floor(iterations * 0.99)];
|
||||
statistics["p95"] = query_latency[floor(iterations * 0.95)];
|
||||
statistics["p90"] = query_latency[floor(iterations * 0.90)];
|
||||
statistics["p75"] = query_latency[floor(iterations * 0.75)];
|
||||
statistics["p50"] = query_latency[floor(iterations * 0.50)];
|
||||
|
||||
} else {
|
||||
spdlog::info("To few iterations to calculate latency values!");
|
||||
statistics["iterations"] = iterations;
|
||||
}
|
||||
return statistics;
|
||||
}
|
||||
|
||||
void Execute(
|
||||
const std::vector<std::pair<std::string, std::map<std::string, memgraph::communication::bolt::Value>>> &queries,
|
||||
std::ostream *stream) {
|
||||
@ -167,6 +229,7 @@ void Execute(
|
||||
std::vector<uint64_t> worker_retries(FLAGS_num_workers, 0);
|
||||
std::vector<Metadata> worker_metadata(FLAGS_num_workers, Metadata());
|
||||
std::vector<double> worker_duration(FLAGS_num_workers, 0.0);
|
||||
std::vector<std::vector<double>> worker_query_durations(FLAGS_num_workers);
|
||||
|
||||
// Start workers and execute queries.
|
||||
auto size = queries.size();
|
||||
@ -187,16 +250,20 @@ void Execute(
|
||||
auto &retries = worker_retries[worker];
|
||||
auto &metadata = worker_metadata[worker];
|
||||
auto &duration = worker_duration[worker];
|
||||
memgraph::utils::Timer timer;
|
||||
auto &query_duration = worker_query_durations[worker];
|
||||
|
||||
memgraph::utils::Timer worker_timer;
|
||||
while (true) {
|
||||
auto pos = position.fetch_add(1, std::memory_order_acq_rel);
|
||||
if (pos >= size) break;
|
||||
const auto &query = queries[pos];
|
||||
memgraph::utils::Timer query_timer;
|
||||
auto ret = ExecuteNTimesTillSuccess(&client, query.first, query.second, FLAGS_max_retries);
|
||||
query_duration.push_back(query_timer.Elapsed().count());
|
||||
retries += ret.second;
|
||||
metadata.Append(ret.first);
|
||||
}
|
||||
duration = timer.Elapsed().count();
|
||||
duration = worker_timer.Elapsed().count();
|
||||
client.Close();
|
||||
}));
|
||||
}
|
||||
@ -218,6 +285,7 @@ void Execute(
|
||||
final_retries += worker_retries[i];
|
||||
final_duration += worker_duration[i];
|
||||
}
|
||||
|
||||
final_duration /= FLAGS_num_workers;
|
||||
nlohmann::json summary = nlohmann::json::object();
|
||||
summary["count"] = queries.size();
|
||||
@ -226,12 +294,76 @@ void Execute(
|
||||
summary["retries"] = final_retries;
|
||||
summary["metadata"] = final_metadata.Export();
|
||||
summary["num_workers"] = FLAGS_num_workers;
|
||||
summary["latency_stats"] = LatencyStatistics(worker_query_durations);
|
||||
(*stream) << summary.dump() << std::endl;
|
||||
}
|
||||
|
||||
nlohmann::json BoltRecordsToJSONStrings(std::vector<std::vector<memgraph::communication::bolt::Value>> &results) {
|
||||
nlohmann::json res = nlohmann::json::object();
|
||||
std::ostringstream oss;
|
||||
for (int i = 0; i < results.size(); i++) {
|
||||
oss << results[i];
|
||||
res[std::to_string(i)] = oss.str();
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
/// Validation mode works on single thread with 1 query.
|
||||
void ExecuteValidation(
|
||||
const std::vector<std::pair<std::string, std::map<std::string, memgraph::communication::bolt::Value>>> &queries,
|
||||
std::ostream *stream) {
|
||||
spdlog::info("Running validation mode, number of workers forced to 1");
|
||||
FLAGS_num_workers = 1;
|
||||
|
||||
Metadata metadata = Metadata();
|
||||
double duration = 0.0;
|
||||
std::vector<std::vector<memgraph::communication::bolt::Value>> results;
|
||||
|
||||
auto size = queries.size();
|
||||
|
||||
memgraph::io::network::Endpoint endpoint(FLAGS_address, FLAGS_port);
|
||||
memgraph::communication::ClientContext context(FLAGS_use_ssl);
|
||||
memgraph::communication::bolt::Client client(context);
|
||||
client.Connect(endpoint, FLAGS_username, FLAGS_password);
|
||||
|
||||
memgraph::utils::Timer timer;
|
||||
if (size == 1) {
|
||||
const auto &query = queries[0];
|
||||
auto ret = ExecuteValidationNTimesTillSuccess(&client, query.first, query.second, FLAGS_max_retries);
|
||||
metadata.Append(ret.first);
|
||||
results = ret.second;
|
||||
duration = timer.Elapsed().count();
|
||||
client.Close();
|
||||
} else {
|
||||
spdlog::info("Validation works with single query, pass just one query!");
|
||||
}
|
||||
|
||||
nlohmann::json summary = nlohmann::json::object();
|
||||
summary["count"] = 1;
|
||||
summary["duration"] = duration;
|
||||
summary["metadata"] = metadata.Export();
|
||||
summary["results"] = BoltRecordsToJSONStrings(results);
|
||||
summary["num_workers"] = FLAGS_num_workers;
|
||||
|
||||
(*stream) << summary.dump() << std::endl;
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||
|
||||
spdlog::info("Running a bolt client with following settings:");
|
||||
spdlog::info("Adress: {} ", FLAGS_address);
|
||||
spdlog::info("Port: {} ", FLAGS_port);
|
||||
spdlog::info("Username: {} ", FLAGS_username);
|
||||
spdlog::info("Password: {} ", FLAGS_password);
|
||||
spdlog::info("Usessl: {} ", FLAGS_use_ssl);
|
||||
spdlog::info("Num of worker: {}", FLAGS_num_workers);
|
||||
spdlog::info("Max retries: {}", FLAGS_max_retries);
|
||||
spdlog::info("Query JSON: {}", FLAGS_queries_json);
|
||||
spdlog::info("Input: {}", FLAGS_input);
|
||||
spdlog::info("Output: {}", FLAGS_output);
|
||||
spdlog::info("Validation: {}", FLAGS_validation);
|
||||
|
||||
memgraph::communication::SSLInit sslInit;
|
||||
|
||||
std::ifstream ifile;
|
||||
@ -291,7 +423,12 @@ int main(int argc, char **argv) {
|
||||
queries.emplace_back(query, std::move(bolt_param.ValueMap()));
|
||||
}
|
||||
}
|
||||
Execute(queries, ostream);
|
||||
|
||||
if (!FLAGS_validation) {
|
||||
Execute(queries, ostream);
|
||||
} else {
|
||||
ExecuteValidation(queries, ostream);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -398,7 +398,13 @@ class Client:
|
||||
password=self._password,
|
||||
port=self._bolt_port,
|
||||
)
|
||||
|
||||
ret = subprocess.run(args, capture_output=True, check=True)
|
||||
error = ret.stderr.decode("utf-8").strip().split("\n")
|
||||
if error and error[0] != "":
|
||||
print("Reported errros from client")
|
||||
print(error)
|
||||
|
||||
data = ret.stdout.decode("utf-8").strip().split("\n")
|
||||
# data = [x for x in data if not x.startswith("[")]
|
||||
data = [x for x in data if not x.startswith("[")]
|
||||
return list(map(json.loads, data))
|
||||
|
Loading…
Reference in New Issue
Block a user