From 6349fc950133be5b2186315419d077c041369a8f Mon Sep 17 00:00:00 2001 From: Ante Javor Date: Sat, 18 Mar 2023 20:18:58 +0100 Subject: [PATCH] Add time-depended execution to the mgbench client (#805) --- tests/mgbench/client.cpp | 128 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 121 insertions(+), 7 deletions(-) diff --git a/tests/mgbench/client.cpp b/tests/mgbench/client.cpp index c7e002df6..b18567fda 100644 --- a/tests/mgbench/client.cpp +++ b/tests/mgbench/client.cpp @@ -58,6 +58,10 @@ DEFINE_bool(validation, false, "Set to true to run client in validation mode." "Validation mode works for singe query and returns results for validation" "with metadata"); +DEFINE_int64(time_dependent_execution, 0, + "Time-dependent executions execute the queries for a specified number of seconds." + "If all queries are executed, and there is still time, queries are rerun again." + "If the time runs out, the client is done with the job and returning results."); std::pair, uint64_t> ExecuteNTimesTillSuccess( memgraph::communication::bolt::Client *client, const std::string &query, @@ -220,7 +224,113 @@ nlohmann::json LatencyStatistics(std::vector> &worker_query_ return statistics; } -void Execute( +void ExecuteTimeDependentWorkload( + const std::vector>> &queries, + std::ostream *stream) { + std::vector threads; + threads.reserve(FLAGS_num_workers); + + std::vector worker_retries(FLAGS_num_workers, 0); + std::vector worker_metadata(FLAGS_num_workers, Metadata()); + std::vector worker_duration(FLAGS_num_workers, 0.0); + std::vector> worker_query_durations(FLAGS_num_workers); + + // Start workers and execute queries. + auto size = queries.size(); + std::atomic run(false); + std::atomic ready(0); + std::atomic position(0); + std::atomic start_workload_timer(false); + + std::chrono::time_point workload_start; + std::chrono::duration time_limit = std::chrono::seconds(FLAGS_time_dependent_execution); + for (int worker = 0; worker < FLAGS_num_workers; ++worker) { + threads.push_back(std::thread([&, worker]() { + memgraph::io::network::Endpoint endpoint(FLAGS_address, FLAGS_port); + memgraph::communication::ClientContext context(FLAGS_use_ssl); + memgraph::communication::bolt::Client client(context); + client.Connect(endpoint, FLAGS_username, FLAGS_password); + + ready.fetch_add(1, std::memory_order_acq_rel); + while (!run.load(std::memory_order_acq_rel)) + ; + auto &retries = worker_retries[worker]; + auto &metadata = worker_metadata[worker]; + auto &duration = worker_duration[worker]; + auto &query_duration = worker_query_durations[worker]; + + // After all threads have been initialised, start the workload timer + if (!start_workload_timer.load()) { + workload_start = std::chrono::steady_clock::now(); + start_workload_timer.store(true); + } + + memgraph::utils::Timer worker_timer; + while (std::chrono::duration_cast>(std::chrono::steady_clock::now() - + workload_start) < time_limit) { + auto pos = position.fetch_add(1, std::memory_order_acq_rel); + if (pos >= size) { + /// Get back to inital position + position.store(0, std::memory_order_acq_rel); + pos = position.fetch_add(1, std::memory_order_acq_rel); + } + const auto &query = queries[pos]; + memgraph::utils::Timer query_timer; + auto ret = ExecuteNTimesTillSuccess(&client, query.first, query.second, FLAGS_max_retries); + query_duration.emplace_back(query_timer.Elapsed().count()); + retries += ret.second; + metadata.Append(ret.first); + duration = worker_timer.Elapsed().count(); + } + client.Close(); + })); + } + + // Synchronize workers and collect runtime. + while (ready.load(std::memory_order_acq_rel) < FLAGS_num_workers) + ; + run.store(true); + for (int i = 0; i < FLAGS_num_workers; ++i) { + threads[i].join(); + } + + // Create and output summary. + Metadata final_metadata; + uint64_t final_retries = 0; + double final_duration = 0.0; + for (int i = 0; i < FLAGS_num_workers; ++i) { + final_metadata += worker_metadata[i]; + final_retries += worker_retries[i]; + final_duration += worker_duration[i]; + } + + int total_iterations = 0; + std::for_each(worker_query_durations.begin(), worker_query_durations.end(), + [&](const std::vector &v) { total_iterations += v.size(); }); + + final_duration /= FLAGS_num_workers; + double execution_delta = time_limit.count() / final_duration; + // This is adjusted throughput based on how much longer did workload execution time took. + double throughput = (total_iterations / final_duration) * execution_delta; + double raw_throughput = total_iterations / final_duration; + + nlohmann::json summary = nlohmann::json::object(); + summary["count"] = queries.size(); + summary["duration"] = final_duration; + summary["time_limit"] = FLAGS_time_dependent_execution; + summary["queries_executed"] = total_iterations; + + summary["throughput"] = throughput; + summary["raw_throughput"] = raw_throughput; + summary["latency_stats"] = LatencyStatistics(worker_query_durations); + summary["retries"] = final_retries; + summary["metadata"] = final_metadata.Export(); + summary["num_workers"] = FLAGS_num_workers; + + (*stream) << summary.dump() << std::endl; +} + +void ExecuteWorkload( const std::vector>> &queries, std::ostream *stream) { std::vector threads; @@ -259,7 +369,7 @@ void Execute( const auto &query = queries[pos]; memgraph::utils::Timer query_timer; auto ret = ExecuteNTimesTillSuccess(&client, query.first, query.second, FLAGS_max_retries); - query_duration.push_back(query_timer.Elapsed().count()); + query_duration.emplace_back(query_timer.Elapsed().count()); retries += ret.second; metadata.Append(ret.first); } @@ -272,6 +382,7 @@ void Execute( while (ready.load(std::memory_order_acq_rel) < FLAGS_num_workers) ; run.store(true, std::memory_order_acq_rel); + for (int i = 0; i < FLAGS_num_workers; ++i) { threads[i].join(); } @@ -363,6 +474,7 @@ int main(int argc, char **argv) { spdlog::info("Input: {}", FLAGS_input); spdlog::info("Output: {}", FLAGS_output); spdlog::info("Validation: {}", FLAGS_validation); + spdlog::info("Time dependend execution: {}", FLAGS_time_dependent_execution); memgraph::communication::SSLInit sslInit; @@ -390,7 +502,7 @@ int main(int argc, char **argv) { while (std::getline(*istream, query)) { auto trimmed = memgraph::utils::Trim(query); if (trimmed == "" || trimmed == ";") { - Execute(queries, ostream); + ExecuteWorkload(queries, ostream); queries.clear(); continue; } @@ -406,7 +518,7 @@ int main(int argc, char **argv) { "array!"); MG_ASSERT(data.is_array() && data.size() == 2, "Each item of the loaded JSON queries must be an array!"); if (data.size() == 0) { - Execute(queries, ostream); + ExecuteWorkload(queries, ostream); queries.clear(); continue; } @@ -424,10 +536,12 @@ int main(int argc, char **argv) { } } - if (!FLAGS_validation) { - Execute(queries, ostream); - } else { + if (FLAGS_validation) { ExecuteValidation(queries, ostream); + } else if (FLAGS_time_dependent_execution > 0) { + ExecuteTimeDependentWorkload(queries, ostream); + } else { + ExecuteWorkload(queries, ostream); } return 0;