Change HA benchmark to run for a fixed amount of time
Summary: In order to get more consistent results, give the benchmark a certain amount of time it is supposed to run and not the number of queries. The resluts on my machine are as following: ``` duration 10.0004 executed_writes 25190 write_per_second 2518.91 duration 10.0005 executed_writes 25096 write_per_second 2509.48 duration 10.0004 executed_writes 23068 write_per_second 2306.7 duration 10.0006 executed_writes 26390 write_per_second 2638.84 duration 10.0008 executed_writes 26246 write_per_second 2624.38 duration 10.0006 executed_writes 24752 write_per_second 2475.06 duration 10.0027 executed_writes 24818 write_per_second 2481.14 duration 10.0032 executed_writes 25148 write_per_second 2513.99 duration 10.0009 executed_writes 25075 write_per_second 2507.28 duration 10.0008 executed_writes 25846 write_per_second 2584.4 duration 10.0006 executed_writes 25671 write_per_second 2566.96 duration 10.0025 executed_writes 25983 write_per_second 2597.65 ``` Reviewers: ipaljak Reviewed By: ipaljak Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1812
This commit is contained in:
parent
12c8b3f75f
commit
aba360968c
@ -4,6 +4,7 @@
|
|||||||
#include <fstream>
|
#include <fstream>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
|
|
||||||
|
#include <fmt/format.h>
|
||||||
#include <gflags/gflags.h>
|
#include <gflags/gflags.h>
|
||||||
|
|
||||||
#include "communication/bolt/client.hpp"
|
#include "communication/bolt/client.hpp"
|
||||||
@ -21,8 +22,8 @@ DEFINE_int32(cluster_size, 3, "Size of the raft cluster.");
|
|||||||
DEFINE_string(username, "", "Username for the database");
|
DEFINE_string(username, "", "Username for the database");
|
||||||
DEFINE_string(password, "", "Password for the database");
|
DEFINE_string(password, "", "Password for the database");
|
||||||
DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server.");
|
DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server.");
|
||||||
DEFINE_int64(query_count, 0, "How many queries should we execute.");
|
DEFINE_double(duration, 10.0,
|
||||||
DEFINE_int64(timeout, 60, "How many seconds should the benchmark wait.");
|
"How long should the client perform writes (seconds)");
|
||||||
DEFINE_string(output_file, "", "Output file where the results should be.");
|
DEFINE_string(output_file, "", "Output file where the results should be.");
|
||||||
|
|
||||||
std::experimental::optional<io::network::Endpoint> GetLeaderEndpoint() {
|
std::experimental::optional<io::network::Endpoint> GetLeaderEndpoint() {
|
||||||
@ -50,7 +51,6 @@ std::experimental::optional<io::network::Endpoint> GetLeaderEndpoint() {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG(INFO) << "Couldn't find Raft cluster leader, retrying...";
|
LOG(INFO) << "Couldn't find Raft cluster leader, retrying...";
|
||||||
std::this_thread::sleep_for(1s);
|
std::this_thread::sleep_for(1s);
|
||||||
}
|
}
|
||||||
@ -64,8 +64,6 @@ int main(int argc, char **argv) {
|
|||||||
google::InitGoogleLogging(argv[0]);
|
google::InitGoogleLogging(argv[0]);
|
||||||
|
|
||||||
std::atomic<int64_t> query_counter{0};
|
std::atomic<int64_t> query_counter{0};
|
||||||
std::atomic<bool> timeout_reached{false};
|
|
||||||
std::atomic<bool> benchmark_finished{false};
|
|
||||||
|
|
||||||
auto leader_endpoint = GetLeaderEndpoint();
|
auto leader_endpoint = GetLeaderEndpoint();
|
||||||
if (!leader_endpoint) {
|
if (!leader_endpoint) {
|
||||||
@ -73,56 +71,48 @@ int main(int argc, char **argv) {
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Kickoff a thread that will timeout after FLAGS_timeout seconds
|
const int num_threads = std::thread::hardware_concurrency();
|
||||||
std::thread timeout_thread_ =
|
|
||||||
std::thread([&timeout_reached, &benchmark_finished]() {
|
|
||||||
utils::ThreadSetName("BenchTimeout");
|
|
||||||
for (int64_t i = 0; i < FLAGS_timeout; ++i) {
|
|
||||||
std::this_thread::sleep_for(1s);
|
|
||||||
if (benchmark_finished.load()) return;
|
|
||||||
}
|
|
||||||
|
|
||||||
timeout_reached.store(true);
|
|
||||||
});
|
|
||||||
|
|
||||||
std::vector<std::thread> threads;
|
std::vector<std::thread> threads;
|
||||||
|
std::vector<double> thread_duration;
|
||||||
|
threads.reserve(num_threads);
|
||||||
|
thread_duration.resize(num_threads);
|
||||||
|
|
||||||
for (int i = 0; i < std::thread::hardware_concurrency(); ++i) {
|
for (int i = 0; i < num_threads; ++i) {
|
||||||
threads.emplace_back(
|
threads.emplace_back([i, endpoint = *leader_endpoint, &query_counter,
|
||||||
[endpoint = *leader_endpoint, &timeout_reached, &query_counter]() {
|
&local_duration = thread_duration[i]]() {
|
||||||
communication::ClientContext context(FLAGS_use_ssl);
|
utils::ThreadSetName(fmt::format("BenchWriter{}", i));
|
||||||
communication::bolt::Client client(&context);
|
communication::ClientContext context(FLAGS_use_ssl);
|
||||||
client.Connect(endpoint, FLAGS_username, FLAGS_password);
|
communication::bolt::Client client(&context);
|
||||||
|
client.Connect(endpoint, FLAGS_username, FLAGS_password);
|
||||||
|
|
||||||
while (query_counter.load() < FLAGS_query_count) {
|
utils::Timer t;
|
||||||
if (timeout_reached.load()) break;
|
while (true) {
|
||||||
|
local_duration = t.Elapsed().count();
|
||||||
|
if (local_duration >= FLAGS_duration) break;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
client.Execute("CREATE (:Node)", {});
|
client.Execute("CREATE (:Node)", {});
|
||||||
query_counter.fetch_add(1);
|
query_counter.fetch_add(1);
|
||||||
} catch (const communication::bolt::ClientQueryException &e) {
|
} catch (const communication::bolt::ClientQueryException &e) {
|
||||||
LOG(WARNING) << e.what();
|
LOG(WARNING) << e.what();
|
||||||
break;
|
break;
|
||||||
} catch (const communication::bolt::ClientFatalException &e) {
|
} catch (const communication::bolt::ClientFatalException &e) {
|
||||||
LOG(WARNING) << e.what();
|
LOG(WARNING) << e.what();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
utils::Timer timer;
|
|
||||||
int64_t query_offset = query_counter.load();
|
|
||||||
|
|
||||||
for (auto &t : threads) {
|
for (auto &t : threads) {
|
||||||
if (t.joinable()) t.join();
|
if (t.joinable()) t.join();
|
||||||
}
|
}
|
||||||
|
|
||||||
double duration = timer.Elapsed().count();
|
double duration = 0;
|
||||||
double write_per_second = (query_counter - query_offset) / duration;
|
for (auto &d : thread_duration) duration += d;
|
||||||
|
duration /= num_threads;
|
||||||
|
|
||||||
benchmark_finished.store(true);
|
double write_per_second = query_counter / duration;
|
||||||
if (timeout_thread_.joinable()) timeout_thread_.join();
|
|
||||||
|
|
||||||
std::ofstream output(FLAGS_output_file);
|
std::ofstream output(FLAGS_output_file);
|
||||||
output << "duration " << duration << std::endl;
|
output << "duration " << duration << std::endl;
|
||||||
|
@ -20,7 +20,7 @@ fi
|
|||||||
RESULTS="$DIR/.apollo_measurements"
|
RESULTS="$DIR/.apollo_measurements"
|
||||||
|
|
||||||
# Benchmark parameters
|
# Benchmark parameters
|
||||||
NODES=150000
|
DURATION=10
|
||||||
|
|
||||||
## Startup
|
## Startup
|
||||||
declare -a HA_PIDS
|
declare -a HA_PIDS
|
||||||
@ -41,8 +41,7 @@ sleep 3
|
|||||||
# Start the memgraph process and wait for it to start.
|
# Start the memgraph process and wait for it to start.
|
||||||
echo_info "Starting HA benchmark"
|
echo_info "Starting HA benchmark"
|
||||||
$binary_dir/tests/feature_benchmark/ha/benchmark \
|
$binary_dir/tests/feature_benchmark/ha/benchmark \
|
||||||
--query-count=$NODES \
|
--duration=$DURATION \
|
||||||
--timeout=60 \
|
|
||||||
--output-file=$RESULTS &
|
--output-file=$RESULTS &
|
||||||
pid=$!
|
pid=$!
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user