Add HA benchmark
Summary: A simple benchmark that starts a HA cluster with 3 machines. The benchmark issues only `CREATE (:Node)` queries. Local results (debug build), for this raft config, are: ``` duration 4.26899 executed_writes 300 write_per_second 70.2743 ``` Reviewers: ipaljak, mferencevic Reviewed By: ipaljak Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1798
This commit is contained in:
parent
2730f2d35f
commit
c39a2278ae
@ -31,7 +31,7 @@
|
||||
mkdir build_release
|
||||
cd build_release
|
||||
cmake -DCMAKE_BUILD_TYPE=release ..
|
||||
TIMEOUT=1200 make -j$THREADS memgraph memgraph_distributed tools memgraph__macro_benchmark memgraph__stress memgraph__manual__card_fraud_generate_snapshot memgraph__feature_benchmark__kafka__benchmark
|
||||
TIMEOUT=1200 make -j$THREADS memgraph memgraph_distributed memgraph_ha tools memgraph__macro_benchmark memgraph__stress memgraph__manual__card_fraud_generate_snapshot memgraph__feature_benchmark__kafka__benchmark memgraph__feature_benchmark__ha__benchmark
|
||||
|
||||
# Generate distributed card fraud dataset.
|
||||
cd ../tests/distributed/card_fraud
|
||||
|
@ -1,2 +1,5 @@
|
||||
# kafka test binaries
|
||||
add_subdirectory(kafka)
|
||||
|
||||
# ha test binaries
|
||||
add_subdirectory(ha)
|
||||
|
@ -8,3 +8,14 @@
|
||||
- ../../../build_release/tests/feature_benchmark/kafka/kafka.py # kafka script
|
||||
- ../../../build_release/tests/feature_benchmark/kafka/benchmark # benchmark binary
|
||||
enable_network: true
|
||||
|
||||
- name: feature_benchmark__ha
|
||||
cd: ha
|
||||
commands: ./runner.sh
|
||||
infiles:
|
||||
- runner.sh # runner script
|
||||
- raft.json # raft configuration file
|
||||
- coordination.json # coordination configuration file
|
||||
- ../../../build_release/tests/feature_benchmark/ha/benchmark # benchmark binary
|
||||
- ../../../build_release/memgraph_ha # memgraph binary
|
||||
enable_network: true
|
||||
|
6
tests/feature_benchmark/ha/CMakeLists.txt
Normal file
6
tests/feature_benchmark/ha/CMakeLists.txt
Normal file
@ -0,0 +1,6 @@
|
||||
set(target_name memgraph__feature_benchmark__ha)
|
||||
|
||||
set(benchmark_target_name ${target_name}__benchmark)
|
||||
add_executable(${benchmark_target_name} benchmark.cpp)
|
||||
set_target_properties(${benchmark_target_name} PROPERTIES OUTPUT_NAME benchmark)
|
||||
target_link_libraries(${benchmark_target_name} mg-utils mg-communication)
|
106
tests/feature_benchmark/ha/benchmark.cpp
Normal file
106
tests/feature_benchmark/ha/benchmark.cpp
Normal file
@ -0,0 +1,106 @@
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <fstream>
|
||||
#include <thread>
|
||||
|
||||
#include <gflags/gflags.h>
|
||||
|
||||
#include "communication/bolt/client.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "io/network/utils.hpp"
|
||||
#include "utils/flag_validation.hpp"
|
||||
#include "utils/thread.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
using namespace std::literals::chrono_literals;
|
||||
|
||||
DEFINE_string(address, "127.0.0.1", "Server address");
|
||||
DEFINE_int32(port, 7687, "Server port");
|
||||
DEFINE_int32(cluster_size, 3, "Size of the raft cluster.");
|
||||
DEFINE_string(username, "", "Username for the database");
|
||||
DEFINE_string(password, "", "Password for the database");
|
||||
DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server.");
|
||||
DEFINE_int64(query_count, 0, "How many queries should we execute.");
|
||||
DEFINE_int64(timeout, 60, "How many seconds should the benchmark wait.");
|
||||
DEFINE_string(output_file, "", "Output file where the results should be.");
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||
google::SetUsageMessage("Memgraph HA benchmark client");
|
||||
google::InitGoogleLogging(argv[0]);
|
||||
|
||||
int64_t query_counter = 0;
|
||||
std::atomic<bool> timeout_reached{false};
|
||||
std::atomic<bool> benchmark_finished{false};
|
||||
|
||||
// Kickoff a thread that will timeout after FLAGS_timeout seconds
|
||||
std::thread timeout_thread_ =
|
||||
std::thread([&timeout_reached, &benchmark_finished]() {
|
||||
utils::ThreadSetName("BenchTimeout");
|
||||
for (int64_t i = 0; i < FLAGS_timeout; ++i) {
|
||||
std::this_thread::sleep_for(1s);
|
||||
if (benchmark_finished.load()) return;
|
||||
}
|
||||
|
||||
timeout_reached.store(true);
|
||||
});
|
||||
|
||||
double duration = 0;
|
||||
double write_per_second = 0;
|
||||
|
||||
bool successful = false;
|
||||
for (int retry = 0; !successful && retry < 10; ++retry) {
|
||||
for (int i = 0; !successful && i < FLAGS_cluster_size; ++i) {
|
||||
try {
|
||||
communication::ClientContext context(FLAGS_use_ssl);
|
||||
communication::bolt::Client client(&context);
|
||||
|
||||
uint16_t port = FLAGS_port + i;
|
||||
io::network::Endpoint endpoint{FLAGS_address, port};
|
||||
client.Connect(endpoint, FLAGS_username, FLAGS_password);
|
||||
|
||||
utils::Timer timer;
|
||||
for (int k = 0; k < FLAGS_query_count; ++k) {
|
||||
client.Execute("CREATE (:Node)", {});
|
||||
query_counter++;
|
||||
|
||||
if (timeout_reached.load()) break;
|
||||
}
|
||||
|
||||
duration = timer.Elapsed().count();
|
||||
successful = true;
|
||||
|
||||
} catch (const communication::bolt::ClientQueryException &) {
|
||||
// This one is not the leader, continue.
|
||||
continue;
|
||||
} catch (const communication::bolt::ClientFatalException &) {
|
||||
// This one seems to be down, continue.
|
||||
continue;
|
||||
}
|
||||
|
||||
if (timeout_reached.load()) break;
|
||||
}
|
||||
|
||||
if (timeout_reached.load()) break;
|
||||
if (!successful) {
|
||||
LOG(INFO) << "Couldn't find Raft cluster leader, retrying...";
|
||||
std::this_thread::sleep_for(1s);
|
||||
}
|
||||
}
|
||||
|
||||
benchmark_finished.store(true);
|
||||
if (timeout_thread_.joinable()) timeout_thread_.join();
|
||||
|
||||
if (successful) {
|
||||
write_per_second = query_counter / duration;
|
||||
}
|
||||
|
||||
std::ofstream output(FLAGS_output_file);
|
||||
output << "duration " << duration << std::endl;
|
||||
output << "executed_writes " << query_counter << std::endl;
|
||||
output << "write_per_second " << write_per_second << std::endl;
|
||||
output.close();
|
||||
|
||||
if (!successful) return 1;
|
||||
return 0;
|
||||
}
|
5
tests/feature_benchmark/ha/coordination.json
Normal file
5
tests/feature_benchmark/ha/coordination.json
Normal file
@ -0,0 +1,5 @@
|
||||
[
|
||||
[1, "127.0.0.1", 10000],
|
||||
[2, "127.0.0.1", 10001],
|
||||
[3, "127.0.0.1", 10002]
|
||||
]
|
6
tests/feature_benchmark/ha/raft.json
Normal file
6
tests/feature_benchmark/ha/raft.json
Normal file
@ -0,0 +1,6 @@
|
||||
{
|
||||
"election_timeout_min": 100,
|
||||
"election_timeout_max": 300,
|
||||
"heartbeat_interval": 1,
|
||||
"replicate_timeout": 100
|
||||
}
|
75
tests/feature_benchmark/ha/runner.sh
Executable file
75
tests/feature_benchmark/ha/runner.sh
Executable file
@ -0,0 +1,75 @@
|
||||
#!/bin/bash
|
||||
|
||||
function echo_info { printf "\033[1;36m~~ $1 ~~\033[0m\n"; }
|
||||
function echo_success { printf "\033[1;32m~~ $1 ~~\033[0m\n\n"; }
|
||||
function echo_failure { printf "\033[1;31m~~ $1 ~~\033[0m\n\n"; }
|
||||
|
||||
## Environment setup
|
||||
|
||||
# Get script location.
|
||||
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
|
||||
cd "$DIR"
|
||||
|
||||
# Find memgraph binaries.
|
||||
binary_dir="$DIR/../../../build"
|
||||
if [ ! -d $binary_dir ]; then
|
||||
binary_dir="$DIR/../../../build_release"
|
||||
fi
|
||||
|
||||
# Results for apollo
|
||||
RESULTS="$DIR/.apollo_measurements"
|
||||
|
||||
# Benchmark parameters
|
||||
# TODO(msantl): We're benchmarking with only 300 nodes because there is a O(n^2)
|
||||
# complexity in the current Raft implementation. Once we remove this bottleneck,
|
||||
# we can raise this number to test proper performance.
|
||||
NODES=300
|
||||
|
||||
## Startup
|
||||
declare -a HA_PIDS
|
||||
|
||||
for server_id in 1 2 3
|
||||
do
|
||||
$binary_dir/memgraph_ha --server_id $server_id \
|
||||
--coordination_config_file="coordination.json" \
|
||||
--raft_config_file="raft.json" \
|
||||
--port $((7686 + $server_id)) \
|
||||
--durability_directory=dur$server_id &
|
||||
HA_PIDS[$server_id]=$!
|
||||
done
|
||||
|
||||
# Allow some time for leader election.
|
||||
sleep 3
|
||||
|
||||
# Start the memgraph process and wait for it to start.
|
||||
echo_info "Starting HA benchmark"
|
||||
$binary_dir/tests/feature_benchmark/ha/benchmark \
|
||||
--query-count=$NODES \
|
||||
--timeout=60 \
|
||||
--output-file=$RESULTS &
|
||||
pid=$!
|
||||
|
||||
wait -n $pid
|
||||
code=$?
|
||||
|
||||
# Shutdown
|
||||
for server_id in 1 2 3
|
||||
do
|
||||
kill -9 ${HA_PIDS[$server_id]}
|
||||
done
|
||||
|
||||
# Cleanup
|
||||
for server_id in 1 2 3
|
||||
do
|
||||
wait -n ${HA_PIDS[$server_id]}
|
||||
rm -r dur$server_id
|
||||
done
|
||||
|
||||
if [ $code -eq 0 ]; then
|
||||
echo_success "Benchmark finished successfully"
|
||||
else
|
||||
echo_failure "Benchmark didn't finish successfully"
|
||||
exit $code
|
||||
fi
|
||||
|
||||
exit 0
|
Loading…
Reference in New Issue
Block a user