Add time-depended execution to the mgbench client (#805)

This commit is contained in:
Ante Javor 2023-03-18 20:18:58 +01:00 committed by GitHub
parent c4167bafdd
commit 6349fc9501
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -58,6 +58,10 @@ DEFINE_bool(validation, false,
"Set to true to run client in validation mode." "Set to true to run client in validation mode."
"Validation mode works for singe query and returns results for validation" "Validation mode works for singe query and returns results for validation"
"with metadata"); "with metadata");
DEFINE_int64(time_dependent_execution, 0,
"Time-dependent executions execute the queries for a specified number of seconds."
"If all queries are executed, and there is still time, queries are rerun again."
"If the time runs out, the client is done with the job and returning results.");
std::pair<std::map<std::string, memgraph::communication::bolt::Value>, uint64_t> ExecuteNTimesTillSuccess( std::pair<std::map<std::string, memgraph::communication::bolt::Value>, uint64_t> ExecuteNTimesTillSuccess(
memgraph::communication::bolt::Client *client, const std::string &query, memgraph::communication::bolt::Client *client, const std::string &query,
@ -220,7 +224,113 @@ nlohmann::json LatencyStatistics(std::vector<std::vector<double>> &worker_query_
return statistics; return statistics;
} }
void Execute( void ExecuteTimeDependentWorkload(
const std::vector<std::pair<std::string, std::map<std::string, memgraph::communication::bolt::Value>>> &queries,
std::ostream *stream) {
std::vector<std::thread> threads;
threads.reserve(FLAGS_num_workers);
std::vector<uint64_t> worker_retries(FLAGS_num_workers, 0);
std::vector<Metadata> worker_metadata(FLAGS_num_workers, Metadata());
std::vector<double> worker_duration(FLAGS_num_workers, 0.0);
std::vector<std::vector<double>> worker_query_durations(FLAGS_num_workers);
// Start workers and execute queries.
auto size = queries.size();
std::atomic<bool> run(false);
std::atomic<uint64_t> ready(0);
std::atomic<uint64_t> position(0);
std::atomic<bool> start_workload_timer(false);
std::chrono::time_point<std::chrono::steady_clock> workload_start;
std::chrono::duration<double> time_limit = std::chrono::seconds(FLAGS_time_dependent_execution);
for (int worker = 0; worker < FLAGS_num_workers; ++worker) {
threads.push_back(std::thread([&, worker]() {
memgraph::io::network::Endpoint endpoint(FLAGS_address, FLAGS_port);
memgraph::communication::ClientContext context(FLAGS_use_ssl);
memgraph::communication::bolt::Client client(context);
client.Connect(endpoint, FLAGS_username, FLAGS_password);
ready.fetch_add(1, std::memory_order_acq_rel);
while (!run.load(std::memory_order_acq_rel))
;
auto &retries = worker_retries[worker];
auto &metadata = worker_metadata[worker];
auto &duration = worker_duration[worker];
auto &query_duration = worker_query_durations[worker];
// After all threads have been initialised, start the workload timer
if (!start_workload_timer.load()) {
workload_start = std::chrono::steady_clock::now();
start_workload_timer.store(true);
}
memgraph::utils::Timer worker_timer;
while (std::chrono::duration_cast<std::chrono::duration<double>>(std::chrono::steady_clock::now() -
workload_start) < time_limit) {
auto pos = position.fetch_add(1, std::memory_order_acq_rel);
if (pos >= size) {
/// Get back to inital position
position.store(0, std::memory_order_acq_rel);
pos = position.fetch_add(1, std::memory_order_acq_rel);
}
const auto &query = queries[pos];
memgraph::utils::Timer query_timer;
auto ret = ExecuteNTimesTillSuccess(&client, query.first, query.second, FLAGS_max_retries);
query_duration.emplace_back(query_timer.Elapsed().count());
retries += ret.second;
metadata.Append(ret.first);
duration = worker_timer.Elapsed().count();
}
client.Close();
}));
}
// Synchronize workers and collect runtime.
while (ready.load(std::memory_order_acq_rel) < FLAGS_num_workers)
;
run.store(true);
for (int i = 0; i < FLAGS_num_workers; ++i) {
threads[i].join();
}
// Create and output summary.
Metadata final_metadata;
uint64_t final_retries = 0;
double final_duration = 0.0;
for (int i = 0; i < FLAGS_num_workers; ++i) {
final_metadata += worker_metadata[i];
final_retries += worker_retries[i];
final_duration += worker_duration[i];
}
int total_iterations = 0;
std::for_each(worker_query_durations.begin(), worker_query_durations.end(),
[&](const std::vector<double> &v) { total_iterations += v.size(); });
final_duration /= FLAGS_num_workers;
double execution_delta = time_limit.count() / final_duration;
// This is adjusted throughput based on how much longer did workload execution time took.
double throughput = (total_iterations / final_duration) * execution_delta;
double raw_throughput = total_iterations / final_duration;
nlohmann::json summary = nlohmann::json::object();
summary["count"] = queries.size();
summary["duration"] = final_duration;
summary["time_limit"] = FLAGS_time_dependent_execution;
summary["queries_executed"] = total_iterations;
summary["throughput"] = throughput;
summary["raw_throughput"] = raw_throughput;
summary["latency_stats"] = LatencyStatistics(worker_query_durations);
summary["retries"] = final_retries;
summary["metadata"] = final_metadata.Export();
summary["num_workers"] = FLAGS_num_workers;
(*stream) << summary.dump() << std::endl;
}
void ExecuteWorkload(
const std::vector<std::pair<std::string, std::map<std::string, memgraph::communication::bolt::Value>>> &queries, const std::vector<std::pair<std::string, std::map<std::string, memgraph::communication::bolt::Value>>> &queries,
std::ostream *stream) { std::ostream *stream) {
std::vector<std::thread> threads; std::vector<std::thread> threads;
@ -259,7 +369,7 @@ void Execute(
const auto &query = queries[pos]; const auto &query = queries[pos];
memgraph::utils::Timer query_timer; memgraph::utils::Timer query_timer;
auto ret = ExecuteNTimesTillSuccess(&client, query.first, query.second, FLAGS_max_retries); auto ret = ExecuteNTimesTillSuccess(&client, query.first, query.second, FLAGS_max_retries);
query_duration.push_back(query_timer.Elapsed().count()); query_duration.emplace_back(query_timer.Elapsed().count());
retries += ret.second; retries += ret.second;
metadata.Append(ret.first); metadata.Append(ret.first);
} }
@ -272,6 +382,7 @@ void Execute(
while (ready.load(std::memory_order_acq_rel) < FLAGS_num_workers) while (ready.load(std::memory_order_acq_rel) < FLAGS_num_workers)
; ;
run.store(true, std::memory_order_acq_rel); run.store(true, std::memory_order_acq_rel);
for (int i = 0; i < FLAGS_num_workers; ++i) { for (int i = 0; i < FLAGS_num_workers; ++i) {
threads[i].join(); threads[i].join();
} }
@ -363,6 +474,7 @@ int main(int argc, char **argv) {
spdlog::info("Input: {}", FLAGS_input); spdlog::info("Input: {}", FLAGS_input);
spdlog::info("Output: {}", FLAGS_output); spdlog::info("Output: {}", FLAGS_output);
spdlog::info("Validation: {}", FLAGS_validation); spdlog::info("Validation: {}", FLAGS_validation);
spdlog::info("Time dependend execution: {}", FLAGS_time_dependent_execution);
memgraph::communication::SSLInit sslInit; memgraph::communication::SSLInit sslInit;
@ -390,7 +502,7 @@ int main(int argc, char **argv) {
while (std::getline(*istream, query)) { while (std::getline(*istream, query)) {
auto trimmed = memgraph::utils::Trim(query); auto trimmed = memgraph::utils::Trim(query);
if (trimmed == "" || trimmed == ";") { if (trimmed == "" || trimmed == ";") {
Execute(queries, ostream); ExecuteWorkload(queries, ostream);
queries.clear(); queries.clear();
continue; continue;
} }
@ -406,7 +518,7 @@ int main(int argc, char **argv) {
"array!"); "array!");
MG_ASSERT(data.is_array() && data.size() == 2, "Each item of the loaded JSON queries must be an array!"); MG_ASSERT(data.is_array() && data.size() == 2, "Each item of the loaded JSON queries must be an array!");
if (data.size() == 0) { if (data.size() == 0) {
Execute(queries, ostream); ExecuteWorkload(queries, ostream);
queries.clear(); queries.clear();
continue; continue;
} }
@ -424,10 +536,12 @@ int main(int argc, char **argv) {
} }
} }
if (!FLAGS_validation) { if (FLAGS_validation) {
Execute(queries, ostream);
} else {
ExecuteValidation(queries, ostream); ExecuteValidation(queries, ostream);
} else if (FLAGS_time_dependent_execution > 0) {
ExecuteTimeDependentWorkload(queries, ostream);
} else {
ExecuteWorkload(queries, ostream);
} }
return 0; return 0;