diff --git a/tests/feature_benchmark/ha/benchmark.cpp b/tests/feature_benchmark/ha/benchmark.cpp index 2e32f0168..80e0abae1 100644 --- a/tests/feature_benchmark/ha/benchmark.cpp +++ b/tests/feature_benchmark/ha/benchmark.cpp @@ -4,6 +4,7 @@ #include #include +#include #include #include "communication/bolt/client.hpp" @@ -21,8 +22,8 @@ DEFINE_int32(cluster_size, 3, "Size of the raft cluster."); DEFINE_string(username, "", "Username for the database"); DEFINE_string(password, "", "Password for the database"); DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server."); -DEFINE_int64(query_count, 0, "How many queries should we execute."); -DEFINE_int64(timeout, 60, "How many seconds should the benchmark wait."); +DEFINE_double(duration, 10.0, + "How long should the client perform writes (seconds)"); DEFINE_string(output_file, "", "Output file where the results should be."); std::experimental::optional GetLeaderEndpoint() { @@ -50,7 +51,6 @@ std::experimental::optional GetLeaderEndpoint() { continue; } } - LOG(INFO) << "Couldn't find Raft cluster leader, retrying..."; std::this_thread::sleep_for(1s); } @@ -64,8 +64,6 @@ int main(int argc, char **argv) { google::InitGoogleLogging(argv[0]); std::atomic query_counter{0}; - std::atomic timeout_reached{false}; - std::atomic benchmark_finished{false}; auto leader_endpoint = GetLeaderEndpoint(); if (!leader_endpoint) { @@ -73,56 +71,48 @@ int main(int argc, char **argv) { return 1; } - // Kickoff a thread that will timeout after FLAGS_timeout seconds - std::thread timeout_thread_ = - std::thread([&timeout_reached, &benchmark_finished]() { - utils::ThreadSetName("BenchTimeout"); - for (int64_t i = 0; i < FLAGS_timeout; ++i) { - std::this_thread::sleep_for(1s); - if (benchmark_finished.load()) return; - } - - timeout_reached.store(true); - }); - + const int num_threads = std::thread::hardware_concurrency(); std::vector threads; + std::vector thread_duration; + threads.reserve(num_threads); + thread_duration.resize(num_threads); - for (int i = 0; i < std::thread::hardware_concurrency(); ++i) { - threads.emplace_back( - [endpoint = *leader_endpoint, &timeout_reached, &query_counter]() { - communication::ClientContext context(FLAGS_use_ssl); - communication::bolt::Client client(&context); - client.Connect(endpoint, FLAGS_username, FLAGS_password); + for (int i = 0; i < num_threads; ++i) { + threads.emplace_back([i, endpoint = *leader_endpoint, &query_counter, + &local_duration = thread_duration[i]]() { + utils::ThreadSetName(fmt::format("BenchWriter{}", i)); + communication::ClientContext context(FLAGS_use_ssl); + communication::bolt::Client client(&context); + client.Connect(endpoint, FLAGS_username, FLAGS_password); - while (query_counter.load() < FLAGS_query_count) { - if (timeout_reached.load()) break; + utils::Timer t; + while (true) { + local_duration = t.Elapsed().count(); + if (local_duration >= FLAGS_duration) break; - try { - client.Execute("CREATE (:Node)", {}); - query_counter.fetch_add(1); - } catch (const communication::bolt::ClientQueryException &e) { - LOG(WARNING) << e.what(); - break; - } catch (const communication::bolt::ClientFatalException &e) { - LOG(WARNING) << e.what(); - break; - } - } - }); + try { + client.Execute("CREATE (:Node)", {}); + query_counter.fetch_add(1); + } catch (const communication::bolt::ClientQueryException &e) { + LOG(WARNING) << e.what(); + break; + } catch (const communication::bolt::ClientFatalException &e) { + LOG(WARNING) << e.what(); + break; + } + } + }); } - utils::Timer timer; - int64_t query_offset = query_counter.load(); - for (auto &t : threads) { if (t.joinable()) t.join(); } - double duration = timer.Elapsed().count(); - double write_per_second = (query_counter - query_offset) / duration; + double duration = 0; + for (auto &d : thread_duration) duration += d; + duration /= num_threads; - benchmark_finished.store(true); - if (timeout_thread_.joinable()) timeout_thread_.join(); + double write_per_second = query_counter / duration; std::ofstream output(FLAGS_output_file); output << "duration " << duration << std::endl; diff --git a/tests/feature_benchmark/ha/runner.sh b/tests/feature_benchmark/ha/runner.sh index b25d5c988..1fe652edd 100755 --- a/tests/feature_benchmark/ha/runner.sh +++ b/tests/feature_benchmark/ha/runner.sh @@ -20,7 +20,7 @@ fi RESULTS="$DIR/.apollo_measurements" # Benchmark parameters -NODES=150000 +DURATION=10 ## Startup declare -a HA_PIDS @@ -41,8 +41,7 @@ sleep 3 # Start the memgraph process and wait for it to start. echo_info "Starting HA benchmark" $binary_dir/tests/feature_benchmark/ha/benchmark \ - --query-count=$NODES \ - --timeout=60 \ + --duration=$DURATION \ --output-file=$RESULTS & pid=$!