Add end2end replication tests based on mgclient (#69)
* Remove old HA benchmark and integration tests
This commit is contained in:
parent
10c8256ec9
commit
afbf672915
9
.github/workflows/diff.yaml
vendored
9
.github/workflows/diff.yaml
vendored
@ -289,6 +289,15 @@ jobs:
|
||||
tests/gql_behave/gql_behave_status.csv
|
||||
tests/gql_behave/gql_behave_status.html
|
||||
|
||||
- name: Run e2e replication tests
|
||||
run: |
|
||||
# TODO(gitbuda): Setup mgclient and pymgclient properly.
|
||||
cd tests
|
||||
./setup.sh
|
||||
source ve3/bin/activate
|
||||
cd e2e
|
||||
LD_LIBRARY_PATH=$LD_LIBRARY_PATH:../../libs/mgclient/lib python runner.py --workloads-path replication/workloads.yaml
|
||||
|
||||
- name: Run stress test (plain)
|
||||
run: |
|
||||
cd tests/stress
|
||||
|
9
.github/workflows/release.yaml
vendored
9
.github/workflows/release.yaml
vendored
@ -281,6 +281,15 @@ jobs:
|
||||
tests/gql_behave/gql_behave_status.csv
|
||||
tests/gql_behave/gql_behave_status.html
|
||||
|
||||
- name: Run e2e replication tests
|
||||
run: |
|
||||
# TODO(gitbuda): Setup mgclient and pymgclient properly.
|
||||
cd tests
|
||||
./setup.sh
|
||||
source ve3/bin/activate
|
||||
cd e2e
|
||||
LD_LIBRARY_PATH=$LD_LIBRARY_PATH:../../libs/mgclient/lib python runner.py --workloads-path replication/workloads.yaml
|
||||
|
||||
- name: Run stress test (plain)
|
||||
run: |
|
||||
cd tests/stress
|
||||
|
9
.github/workflows/release_centos.yaml
vendored
9
.github/workflows/release_centos.yaml
vendored
@ -280,6 +280,15 @@ jobs:
|
||||
tests/gql_behave/gql_behave_status.csv
|
||||
tests/gql_behave/gql_behave_status.html
|
||||
|
||||
- name: Run e2e replication tests
|
||||
run: |
|
||||
# TODO(gitbuda): Setup mgclient and pymgclient properly.
|
||||
cd tests
|
||||
./setup.sh
|
||||
source ve3/bin/activate
|
||||
cd e2e
|
||||
LD_LIBRARY_PATH=$LD_LIBRARY_PATH:../../libs/mgclient/lib python runner.py --workloads-path replication/workloads.yaml
|
||||
|
||||
- name: Run stress test (plain)
|
||||
run: |
|
||||
cd tests/stress
|
||||
|
10
init
10
init
@ -80,7 +80,7 @@ echo \
|
||||
(ql:quickload '(:lcp :lcp/test) :silent t)
|
||||
" | sbcl --script
|
||||
|
||||
# setup libs (download)
|
||||
# Setup libs (download).
|
||||
cd libs
|
||||
./cleanup.sh
|
||||
./setup.sh
|
||||
@ -95,6 +95,14 @@ setup_virtualenv tests/stress
|
||||
# setup integration/ldap dependencies
|
||||
setup_virtualenv tests/integration/ldap
|
||||
|
||||
# Setup tests dependencies.
|
||||
# cd tests
|
||||
# ./setup.sh
|
||||
# cd ..
|
||||
# TODO(gitbuda): Remove setup_virtualenv, replace it with tests/ve3. Take care
|
||||
# of the build order because tests/setup.py builds pymgclient which depends on
|
||||
# mgclient which is build after this script by calling make.
|
||||
|
||||
echo "Done installing dependencies for Memgraph"
|
||||
|
||||
echo "Linking git hooks"
|
||||
|
@ -117,6 +117,10 @@ clone https://github.com/facebook/rocksdb.git rocksdb $rocksdb_tag
|
||||
sed -i 's/TARGETS ${ROCKSDB_SHARED_LIB}/TARGETS ${ROCKSDB_SHARED_LIB} OPTIONAL/' rocksdb/CMakeLists.txt
|
||||
|
||||
# mgclient
|
||||
mgclient_tag="fe94b3631385ef5dbe40a3d8458860dbcc33e6ea" # May 27, 2019
|
||||
mgclient_tag="v1.2.0" # (2021-01-14)
|
||||
clone https://github.com/memgraph/mgclient.git mgclient $mgclient_tag
|
||||
sed -i 's/\${CMAKE_INSTALL_LIBDIR}/lib/' mgclient/src/CMakeLists.txt
|
||||
|
||||
# pymgclient
|
||||
pymgclient_tag="4f85c179e56302d46a1e3e2cf43509db65f062b3" # (2021-01-15)
|
||||
clone https://github.com/memgraph/pymgclient.git pymgclient $pymgclient_tag
|
||||
|
@ -22,8 +22,8 @@ add_subdirectory(property_based)
|
||||
# integration test binaries
|
||||
add_subdirectory(integration)
|
||||
|
||||
# feature benchmark test binaries
|
||||
add_subdirectory(feature_benchmark)
|
||||
# e2e test binaries
|
||||
add_subdirectory(e2e)
|
||||
|
||||
# mgbench benchmark test binaries
|
||||
add_subdirectory(mgbench)
|
||||
|
1
tests/e2e/.gitignore
vendored
Normal file
1
tests/e2e/.gitignore
vendored
Normal file
@ -0,0 +1 @@
|
||||
*.json
|
1
tests/e2e/CMakeLists.txt
Normal file
1
tests/e2e/CMakeLists.txt
Normal file
@ -0,0 +1 @@
|
||||
add_subdirectory(replication)
|
89
tests/e2e/memgraph.py
Executable file
89
tests/e2e/memgraph.py
Executable file
@ -0,0 +1,89 @@
|
||||
import copy
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
|
||||
import mgclient
|
||||
|
||||
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
||||
PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", ".."))
|
||||
BUILD_DIR = os.path.join(PROJECT_DIR, "build")
|
||||
MEMGRAPH_BINARY = os.path.join(BUILD_DIR, "memgraph")
|
||||
|
||||
|
||||
def wait_for_server(port, delay=0.01):
|
||||
cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)]
|
||||
count = 0
|
||||
while subprocess.call(cmd) != 0:
|
||||
time.sleep(0.01)
|
||||
if count > 10 / 0.01:
|
||||
print("Could not wait for server on port", port, "to startup!")
|
||||
sys.exit(1)
|
||||
count += 1
|
||||
time.sleep(delay)
|
||||
|
||||
|
||||
def extract_bolt_port(args):
|
||||
for arg_index, arg in enumerate(args):
|
||||
if arg.startswith('--bolt-port='):
|
||||
maybe_port = arg.split('=')[1]
|
||||
if not maybe_port.isdigit():
|
||||
raise Exception('Unable to read Bolt port after --bolt-port=.')
|
||||
return int(maybe_port)
|
||||
elif arg == '--bolt-port':
|
||||
maybe_port = args[arg_index + 1]
|
||||
if not maybe_port.isdigit():
|
||||
raise Exception('Unable to read Bolt port after --bolt-port.')
|
||||
return int(maybe_port)
|
||||
return 7687
|
||||
|
||||
|
||||
class MemgraphInstanceRunner():
|
||||
def __init__(self, binary_path=MEMGRAPH_BINARY, args=[]):
|
||||
self.host = '127.0.0.1'
|
||||
self.bolt_port = extract_bolt_port(args)
|
||||
self.binary_path = binary_path
|
||||
self.args = args
|
||||
self.proc_mg = None
|
||||
self.conn = None
|
||||
|
||||
def query(self, query):
|
||||
cursor = self.conn.cursor()
|
||||
cursor.execute(query)
|
||||
return cursor.fetchall()
|
||||
|
||||
def start(self, restart=False, args=[]):
|
||||
if not restart and self.is_running():
|
||||
return
|
||||
self.stop()
|
||||
self.args = copy.deepcopy(args)
|
||||
self.data_directory = tempfile.TemporaryDirectory()
|
||||
args_mg = [self.binary_path,
|
||||
"--data-directory", self.data_directory.name,
|
||||
"--storage-wal-enabled",
|
||||
"--storage-snapshot-interval-sec", "300",
|
||||
"--storage-properties-on-edges"] + self.args
|
||||
self.bolt_port = extract_bolt_port(args_mg)
|
||||
self.proc_mg = subprocess.Popen(args_mg)
|
||||
wait_for_server(self.bolt_port)
|
||||
self.conn = mgclient.connect(
|
||||
host=self.host,
|
||||
port=self.bolt_port)
|
||||
self.conn.autocommit = True
|
||||
assert self.is_running(), "The Memgraph process died!"
|
||||
|
||||
def is_running(self):
|
||||
if self.proc_mg is None:
|
||||
return False
|
||||
if self.proc_mg.poll() is not None:
|
||||
return False
|
||||
return True
|
||||
|
||||
def stop(self):
|
||||
if not self.is_running():
|
||||
return
|
||||
self.proc_mg.terminate()
|
||||
code = self.proc_mg.wait()
|
||||
assert code == 0, "The Memgraph process exited with non-zero!"
|
5
tests/e2e/replication/CMakeLists.txt
Normal file
5
tests/e2e/replication/CMakeLists.txt
Normal file
@ -0,0 +1,5 @@
|
||||
add_executable(memgraph__e2e__replication__constraints constraints.cpp)
|
||||
target_link_libraries(memgraph__e2e__replication__constraints glog gflags mgclient mg-utils mg-io Threads::Threads)
|
||||
|
||||
add_executable(memgraph__e2e__replication__read_write_benchmark read_write_benchmark.cpp)
|
||||
target_link_libraries(memgraph__e2e__replication__read_write_benchmark glog gflags json mgclient mg-utils mg-io Threads::Threads)
|
71
tests/e2e/replication/common.hpp
Normal file
71
tests/e2e/replication/common.hpp
Normal file
@ -0,0 +1,71 @@
|
||||
#include <chrono>
|
||||
#include <random>
|
||||
|
||||
#include <gflags/gflags.h>
|
||||
#include <glog/logging.h>
|
||||
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "mgclient.hpp"
|
||||
#include "utils/string.hpp"
|
||||
|
||||
DEFINE_string(database_endpoints,
|
||||
"127.0.0.1:7687,127.0.0.1:7688,127.0.0.1:7689",
|
||||
"An array of database endspoints. Each endpoint is separated by "
|
||||
"comma. Within each endpoint, colon separates host and port. Use "
|
||||
"IPv4 addresses as hosts. First endpoint represents main "
|
||||
"replication instance.");
|
||||
DEFINE_string(username, "", "Database username.");
|
||||
DEFINE_string(password, "", "Database password.");
|
||||
DEFINE_bool(use_ssl, false, "Use SSL connection.");
|
||||
DEFINE_int32(nodes, 1000, "Number of nodes in DB.");
|
||||
DEFINE_int32(edges, 5000, "Number of edges in DB.");
|
||||
DEFINE_double(reads_duration_limit, 10.0,
|
||||
"How long should the client perform reads (seconds)");
|
||||
|
||||
namespace mg::e2e::replication {
|
||||
|
||||
auto ParseDatabaseEndpoints(const std::string &database_endpoints_str) {
|
||||
const auto db_endpoints_strs = utils::Split(database_endpoints_str, ",");
|
||||
std::vector<io::network::Endpoint> database_endpoints;
|
||||
for (const auto &db_endpoint_str : db_endpoints_strs) {
|
||||
const auto maybe_host_port =
|
||||
io::network::Endpoint::ParseSocketOrIpAddress(db_endpoint_str, 7687);
|
||||
CHECK(maybe_host_port);
|
||||
database_endpoints.emplace_back(
|
||||
io::network::Endpoint(maybe_host_port->first, maybe_host_port->second));
|
||||
}
|
||||
return database_endpoints;
|
||||
}
|
||||
|
||||
auto Connect(const io::network::Endpoint &database_endpoint) {
|
||||
mg::Client::Params params;
|
||||
params.host = database_endpoint.address;
|
||||
params.port = database_endpoint.port;
|
||||
params.use_ssl = FLAGS_use_ssl;
|
||||
auto client = mg::Client::Connect(params);
|
||||
if (!client) {
|
||||
LOG(FATAL) << "Failed to connect!";
|
||||
}
|
||||
return client;
|
||||
}
|
||||
|
||||
class IntGenerator {
|
||||
public:
|
||||
IntGenerator(const std::string &purpose, int start, int end)
|
||||
: seed_(std::chrono::high_resolution_clock::now()
|
||||
.time_since_epoch()
|
||||
.count()),
|
||||
rng_(seed_),
|
||||
dist_(start, end) {
|
||||
LOG(INFO) << purpose << " int generator seed: " << seed_;
|
||||
}
|
||||
|
||||
int Next() { return dist_(rng_); }
|
||||
|
||||
private:
|
||||
uint64_t seed_;
|
||||
std::mt19937 rng_;
|
||||
std::uniform_int_distribution<int> dist_;
|
||||
};
|
||||
|
||||
} // namespace mg::e2e::replication
|
146
tests/e2e/replication/constraints.cpp
Normal file
146
tests/e2e/replication/constraints.cpp
Normal file
@ -0,0 +1,146 @@
|
||||
#include <chrono>
|
||||
#include <random>
|
||||
#include <ranges>
|
||||
#include <thread>
|
||||
#include <unordered_set>
|
||||
|
||||
#include <fmt/format.h>
|
||||
#include <gflags/gflags.h>
|
||||
#include <glog/logging.h>
|
||||
|
||||
#include "common.hpp"
|
||||
#include "utils/thread.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
google::SetUsageMessage("Memgraph E2E Replication Read-write Benchmark");
|
||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||
google::InitGoogleLogging(argv[0]);
|
||||
auto database_endpoints =
|
||||
mg::e2e::replication::ParseDatabaseEndpoints(FLAGS_database_endpoints);
|
||||
|
||||
mg::Client::Init();
|
||||
|
||||
{
|
||||
auto client = mg::e2e::replication::Connect(database_endpoints[0]);
|
||||
client->Execute("MATCH (n) DETACH DELETE n;");
|
||||
client->DiscardAll();
|
||||
client->Execute("CREATE INDEX ON :Node(id);");
|
||||
client->DiscardAll();
|
||||
client->Execute("CREATE CONSTRAINT ON (n:Node) ASSERT n.id IS UNIQUE;");
|
||||
client->DiscardAll();
|
||||
|
||||
// Sleep a bit so the constraints get replicated.
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
||||
for (const auto &database_endpoint : database_endpoints) {
|
||||
auto client = mg::e2e::replication::Connect(database_endpoint);
|
||||
client->Execute("SHOW CONSTRAINT INFO;");
|
||||
if (const auto data = client->FetchAll()) {
|
||||
const auto label_name = (*data)[0][1].ValueString();
|
||||
const auto property_name = (*data)[0][2].ValueList()[0].ValueString();
|
||||
if (label_name != "Node" || property_name != "id") {
|
||||
LOG(FATAL) << database_endpoint
|
||||
<< " does NOT hava valid constraint created.";
|
||||
}
|
||||
} else {
|
||||
LOG(FATAL) << "Unable to get CONSTRAINT INFO from "
|
||||
<< database_endpoint;
|
||||
}
|
||||
}
|
||||
LOG(INFO) << "All constraints are in-place.";
|
||||
|
||||
for (int i = 0; i < FLAGS_nodes; ++i) {
|
||||
client->Execute("CREATE (:Node {id:" + std::to_string(i) + "});");
|
||||
client->DiscardAll();
|
||||
}
|
||||
mg::e2e::replication::IntGenerator edge_generator("EdgeCreateGenerator", 0,
|
||||
FLAGS_nodes - 1);
|
||||
for (int i = 0; i < FLAGS_edges; ++i) {
|
||||
client->Execute("MATCH (n {id:" + std::to_string(edge_generator.Next()) +
|
||||
"}), (m {id:" + std::to_string(edge_generator.Next()) +
|
||||
"}) CREATE (n)-[:Edge]->(m);");
|
||||
client->DiscardAll();
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
const int num_threads = std::thread::hardware_concurrency();
|
||||
std::vector<std::thread> threads;
|
||||
std::vector<double> thread_duration;
|
||||
threads.reserve(num_threads);
|
||||
thread_duration.resize(num_threads);
|
||||
|
||||
for (int i = 0; i < num_threads; ++i) {
|
||||
const auto &database_endpoint =
|
||||
database_endpoints[i % database_endpoints.size()];
|
||||
threads.emplace_back([i, &database_endpoint,
|
||||
cluster_size = database_endpoints.size(),
|
||||
&local_duration = thread_duration[i]] {
|
||||
auto client = mg::e2e::replication::Connect(database_endpoint);
|
||||
mg::e2e::replication::IntGenerator node_update_generator(
|
||||
fmt::format("NodeUpdateGenerator {}", i), 0, FLAGS_nodes - 1);
|
||||
utils::Timer t;
|
||||
|
||||
while (true) {
|
||||
local_duration = t.Elapsed().count();
|
||||
if (local_duration >= FLAGS_reads_duration_limit) break;
|
||||
// In the main case try to update.
|
||||
if (i % cluster_size == 0) {
|
||||
try {
|
||||
client->Execute("MATCH (n:Node {id:" +
|
||||
std::to_string(node_update_generator.Next()) +
|
||||
"}) SET n.id = " +
|
||||
std::to_string(node_update_generator.Next()) +
|
||||
" RETURN n.id;");
|
||||
client->FetchAll();
|
||||
} catch (const std::exception &e) {
|
||||
// Pass.
|
||||
}
|
||||
} else { // In the replica case fetch all unique ids.
|
||||
try {
|
||||
client->Execute("MATCH (n) RETURN n.id;");
|
||||
const auto data = client->FetchAll();
|
||||
std::unordered_set<int64_t> unique;
|
||||
for (const auto &value : *data) {
|
||||
unique.insert(value[0].ValueInt());
|
||||
}
|
||||
if ((*data).size() != unique.size()) {
|
||||
LOG(FATAL) << "Some ids are equal.";
|
||||
}
|
||||
} catch (const std::exception &e) {
|
||||
LOG(FATAL) << e.what();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
for (auto &t : threads) {
|
||||
if (t.joinable()) t.join();
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
auto client = mg::e2e::replication::Connect(database_endpoints[0]);
|
||||
client->Execute("DROP CONSTRAINT ON (n:Node) ASSERT n.id IS UNIQUE");
|
||||
client->DiscardAll();
|
||||
// Sleep a bit so the drop constraints get replicated.
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
||||
for (const auto &database_endpoint : database_endpoints) {
|
||||
auto client = mg::e2e::replication::Connect(database_endpoint);
|
||||
client->Execute("SHOW CONSTRAINT INFO;");
|
||||
if (const auto data = client->FetchAll()) {
|
||||
if ((*data).size() != 0) {
|
||||
LOG(FATAL) << database_endpoint << " still have some constraints.";
|
||||
}
|
||||
} else {
|
||||
LOG(FATAL) << "Unable to get CONSTRAINT INFO from "
|
||||
<< database_endpoint;
|
||||
}
|
||||
}
|
||||
LOG(INFO) << "All constraints were deleted.";
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
153
tests/e2e/replication/read_write_benchmark.cpp
Normal file
153
tests/e2e/replication/read_write_benchmark.cpp
Normal file
@ -0,0 +1,153 @@
|
||||
#include <chrono>
|
||||
#include <fstream>
|
||||
#include <random>
|
||||
#include <ranges>
|
||||
#include <thread>
|
||||
|
||||
#include <fmt/format.h>
|
||||
#include <gflags/gflags.h>
|
||||
#include <glog/logging.h>
|
||||
#include <json/json.hpp>
|
||||
|
||||
#include "common.hpp"
|
||||
#include "utils/thread.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
DEFINE_string(output_file,
|
||||
"memgraph__e2e__replication__read_write_benchmark.json",
|
||||
"Output file where the results should be in JSON format.");
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
google::SetUsageMessage("Memgraph E2E Replication Read-write Benchmark");
|
||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||
google::InitGoogleLogging(argv[0]);
|
||||
const auto database_endpoints =
|
||||
mg::e2e::replication::ParseDatabaseEndpoints(FLAGS_database_endpoints);
|
||||
nlohmann::json output;
|
||||
output["nodes"] = FLAGS_nodes;
|
||||
output["edges"] = FLAGS_edges;
|
||||
|
||||
mg::Client::Init();
|
||||
|
||||
{
|
||||
auto client = mg::e2e::replication::Connect(database_endpoints[0]);
|
||||
client->Execute("MATCH (n) DETACH DELETE n;");
|
||||
client->DiscardAll();
|
||||
client->Execute("CREATE INDEX ON :Node(id);");
|
||||
client->DiscardAll();
|
||||
|
||||
// Sleep a bit so the index get replicated.
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
||||
for (const auto &database_endpoint : database_endpoints) {
|
||||
auto client = mg::e2e::replication::Connect(database_endpoint);
|
||||
client->Execute("SHOW INDEX INFO;");
|
||||
if (auto data = client->FetchAll()) {
|
||||
auto label_name = (*data)[0][1].ValueString();
|
||||
auto property_name = (*data)[0][2].ValueString();
|
||||
if (label_name != "Node" || property_name != "id") {
|
||||
LOG(FATAL) << database_endpoint
|
||||
<< " does NOT have valid indexes created.";
|
||||
}
|
||||
} else {
|
||||
LOG(FATAL) << "Unable to get INDEX INFO from " << database_endpoint;
|
||||
}
|
||||
}
|
||||
LOG(INFO) << "All indexes are in-place.";
|
||||
|
||||
utils::Timer node_write_timer;
|
||||
for (int i = 0; i < FLAGS_nodes; ++i) {
|
||||
client->Execute("CREATE (:Node {id:" + std::to_string(i) + "});");
|
||||
client->DiscardAll();
|
||||
}
|
||||
output["node_write_time"] = node_write_timer.Elapsed().count();
|
||||
|
||||
mg::e2e::replication::IntGenerator edge_generator("EdgeCreateGenerator", 0,
|
||||
FLAGS_nodes - 1);
|
||||
utils::Timer edge_write_timer;
|
||||
for (int i = 0; i < FLAGS_edges; ++i) {
|
||||
client->Execute("MATCH (n {id:" + std::to_string(edge_generator.Next()) +
|
||||
"}), (m {id:" + std::to_string(edge_generator.Next()) +
|
||||
"}) CREATE (n)-[:Edge]->(m);");
|
||||
client->DiscardAll();
|
||||
}
|
||||
output["edge_write_time"] = node_write_timer.Elapsed().count();
|
||||
}
|
||||
|
||||
{ // Benchmark read queries.
|
||||
const int num_threads = std::thread::hardware_concurrency();
|
||||
std::atomic<int64_t> query_counter{0};
|
||||
std::vector<std::thread> threads;
|
||||
std::vector<double> thread_duration;
|
||||
threads.reserve(num_threads);
|
||||
thread_duration.resize(num_threads);
|
||||
|
||||
for (int i = 0; i < num_threads; ++i) {
|
||||
const auto &database_endpoint =
|
||||
database_endpoints[i % database_endpoints.size()];
|
||||
threads.emplace_back([i, &database_endpoint, &query_counter,
|
||||
&local_duration = thread_duration[i]] {
|
||||
utils::ThreadSetName(fmt::format("BenchWriter{}", i));
|
||||
auto client = mg::e2e::replication::Connect(database_endpoint);
|
||||
mg::e2e::replication::IntGenerator node_generator(
|
||||
fmt::format("NodeReadGenerator {}", i), 0, FLAGS_nodes - 1);
|
||||
utils::Timer t;
|
||||
|
||||
while (true) {
|
||||
local_duration = t.Elapsed().count();
|
||||
if (local_duration >= FLAGS_reads_duration_limit) break;
|
||||
try {
|
||||
client->Execute(
|
||||
"MATCH (n {id:" + std::to_string(node_generator.Next()) +
|
||||
"})-[e]->(m) RETURN e, m;");
|
||||
client->DiscardAll();
|
||||
query_counter.fetch_add(1);
|
||||
} catch (const std::exception &e) {
|
||||
LOG(FATAL) << e.what();
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
for (auto &t : threads) {
|
||||
if (t.joinable()) t.join();
|
||||
}
|
||||
|
||||
double all_duration = 0;
|
||||
for (auto &d : thread_duration) all_duration += d;
|
||||
output["total_read_duration"] = all_duration;
|
||||
double per_thread_read_duration = all_duration / num_threads;
|
||||
output["per_thread_read_duration"] = per_thread_read_duration;
|
||||
output["read_per_second"] = query_counter / per_thread_read_duration;
|
||||
output["read_queries"] = query_counter.load();
|
||||
|
||||
LOG(INFO) << "Output data: " << output.dump();
|
||||
std::ofstream output_file(FLAGS_output_file);
|
||||
output_file << output.dump();
|
||||
output_file.close();
|
||||
}
|
||||
|
||||
{
|
||||
auto client = mg::e2e::replication::Connect(database_endpoints[0]);
|
||||
client->Execute("DROP INDEX ON :Node(id);");
|
||||
client->DiscardAll();
|
||||
// Sleep a bit so the drop index get replicated.
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
||||
for (const auto &database_endpoint : database_endpoints) {
|
||||
auto client = mg::e2e::replication::Connect(database_endpoint);
|
||||
client->Execute("SHOW INDEX INFO;");
|
||||
if (const auto data = client->FetchAll()) {
|
||||
if ((*data).size() != 0) {
|
||||
LOG(FATAL) << database_endpoint << " still have some indexes.";
|
||||
}
|
||||
} else {
|
||||
LOG(FATAL) << "Unable to get INDEX INFO from " << database_endpoint;
|
||||
}
|
||||
}
|
||||
LOG(INFO) << "All indexes were deleted.";
|
||||
}
|
||||
|
||||
mg::Client::Finalize();
|
||||
|
||||
return 0;
|
||||
}
|
45
tests/e2e/replication/workloads.yaml
Normal file
45
tests/e2e/replication/workloads.yaml
Normal file
@ -0,0 +1,45 @@
|
||||
template_test_nodes_query: &template_test_nodes_query
|
||||
- query: "MATCH (n) RETURN count(n);"
|
||||
expected: 1000
|
||||
template_test_edge_query: &template_test_edges_query
|
||||
- query: "MATCH (n)-[r]->(m) RETURN count(r);"
|
||||
expected: 5000
|
||||
template_validation_queries: &template_validation_queries
|
||||
validation_queries:
|
||||
- <<: *template_test_nodes_query
|
||||
- <<: *template_test_edges_query
|
||||
template_cluster: &template_cluster
|
||||
cluster:
|
||||
replica_1:
|
||||
args: ["--bolt-port", "7688"]
|
||||
setup_queries: ["SET REPLICATION ROLE TO REPLICA WITH PORT 10001;"]
|
||||
<<: *template_validation_queries
|
||||
replica_2:
|
||||
args: ["--bolt-port", "7689"]
|
||||
setup_queries: ["SET REPLICATION ROLE TO REPLICA WITH PORT 10002;"]
|
||||
<<: *template_validation_queries
|
||||
replica_3:
|
||||
args: ["--bolt-port", "7690"]
|
||||
setup_queries: ["SET REPLICATION ROLE TO REPLICA WITH PORT 10003;"]
|
||||
<<: *template_validation_queries
|
||||
main:
|
||||
args: ["--bolt-port", "7687"]
|
||||
setup_queries: [
|
||||
"REGISTER REPLICA replica_1 SYNC WITH TIMEOUT 0 TO '127.0.0.1:10001'",
|
||||
"REGISTER REPLICA replica_2 SYNC WITH TIMEOUT 1 TO '127.0.0.1:10002'",
|
||||
"REGISTER REPLICA replica_3 ASYNC TO '127.0.0.1:10003'"
|
||||
]
|
||||
<<: *template_validation_queries
|
||||
|
||||
workloads:
|
||||
- name: "Constraints"
|
||||
binary: "tests/e2e/replication/memgraph__e2e__replication__constraints"
|
||||
args: []
|
||||
<<: *template_cluster
|
||||
|
||||
- name: "Read-write benchmark"
|
||||
binary: "tests/e2e/replication/memgraph__e2e__replication__read_write_benchmark"
|
||||
args: []
|
||||
<<: *template_cluster
|
||||
|
||||
|
71
tests/e2e/runner.py
Executable file
71
tests/e2e/runner.py
Executable file
@ -0,0 +1,71 @@
|
||||
from argparse import ArgumentParser
|
||||
import atexit
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
import yaml
|
||||
|
||||
from memgraph import MemgraphInstanceRunner
|
||||
|
||||
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
||||
PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", ".."))
|
||||
BUILD_DIR = os.path.join(PROJECT_DIR, "build")
|
||||
MEMGRAPH_BINARY = os.path.join(BUILD_DIR, "memgraph")
|
||||
|
||||
log = logging.getLogger("memgraph.tests.e2e.replication")
|
||||
|
||||
|
||||
def load_args():
|
||||
parser = ArgumentParser()
|
||||
parser.add_argument("--workloads-path", required=True)
|
||||
parser.add_argument("--workload-name", default=None, required=False)
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def load_workloads(path):
|
||||
with open(path, "r") as f:
|
||||
return yaml.load(f, Loader=yaml.FullLoader)['workloads']
|
||||
|
||||
|
||||
def run(args):
|
||||
workloads = load_workloads(args.workloads_path)
|
||||
for workload in workloads:
|
||||
workload_name = workload['name']
|
||||
if args.workload_name is not None and \
|
||||
args.workload_name != workload_name:
|
||||
continue
|
||||
log.info("%s STARTED.", workload_name)
|
||||
# Setup.
|
||||
mg_instances = {}
|
||||
@atexit.register
|
||||
def cleanup():
|
||||
for mg_instance in mg_instances.values():
|
||||
mg_instance.stop()
|
||||
for name, config in workload['cluster'].items():
|
||||
mg_instance = MemgraphInstanceRunner(MEMGRAPH_BINARY)
|
||||
mg_instances[name] = mg_instance
|
||||
mg_instance.start(args=config['args'])
|
||||
for query in config['setup_queries']:
|
||||
mg_instance.query(query)
|
||||
# Test.
|
||||
mg_test_binary = os.path.join(BUILD_DIR, workload['binary'])
|
||||
subprocess.run(
|
||||
[mg_test_binary] + workload['args'],
|
||||
check=True,
|
||||
stderr=subprocess.STDOUT)
|
||||
# Validation.
|
||||
for name, config in workload['cluster'].items():
|
||||
for validation in config['validation_queries']:
|
||||
mg_instance = mg_instances[name]
|
||||
data = mg_instance.query(validation['query'])[0][0]
|
||||
assert data == validation['expected']
|
||||
cleanup()
|
||||
log.info("%s PASSED.", workload_name)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
args = load_args()
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(levelname)s %(asctime)s %(name)s] %(message)s')
|
||||
run(args)
|
@ -1,2 +0,0 @@
|
||||
## ha test binaries
|
||||
#add_subdirectory(ha)
|
@ -1,3 +0,0 @@
|
||||
# test binaries
|
||||
add_subdirectory(read)
|
||||
add_subdirectory(write)
|
@ -1,6 +0,0 @@
|
||||
set(target_name memgraph__feature_benchmark__ha__read)
|
||||
|
||||
set(benchmark_target_name ${target_name}__benchmark)
|
||||
add_executable(${benchmark_target_name} benchmark.cpp)
|
||||
set_target_properties(${benchmark_target_name} PROPERTIES OUTPUT_NAME benchmark)
|
||||
target_link_libraries(${benchmark_target_name} mg-utils mg-communication)
|
@ -1,127 +0,0 @@
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <fstream>
|
||||
#include <optional>
|
||||
#include <random>
|
||||
#include <thread>
|
||||
|
||||
#include <fmt/format.h>
|
||||
#include <gflags/gflags.h>
|
||||
|
||||
#include "communication/bolt/ha_client.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "io/network/utils.hpp"
|
||||
#include "utils/flag_validation.hpp"
|
||||
#include "utils/thread.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
using namespace std::literals::chrono_literals;
|
||||
|
||||
DEFINE_string(address, "127.0.0.1", "Server address");
|
||||
DEFINE_int32(port, 7687, "Server port");
|
||||
DEFINE_int32(cluster_size, 3, "Size of the raft cluster.");
|
||||
DEFINE_string(username, "", "Username for the database");
|
||||
DEFINE_string(password, "", "Password for the database");
|
||||
DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server.");
|
||||
DEFINE_double(duration, 10.0,
|
||||
"How long should the client perform reads (seconds)");
|
||||
DEFINE_string(output_file, "", "Output file where the results should be.");
|
||||
DEFINE_int32(nodes, 1000, "Number of nodes in DB");
|
||||
DEFINE_int32(edges, 5000, "Number of edges in DB");
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||
google::SetUsageMessage("Memgraph HA read benchmark client");
|
||||
google::InitGoogleLogging(argv[0]);
|
||||
|
||||
std::atomic<int64_t> query_counter{0};
|
||||
|
||||
std::vector<io::network::Endpoint> endpoints;
|
||||
for (int i = 0; i < FLAGS_cluster_size; ++i) {
|
||||
uint16_t port = FLAGS_port + i;
|
||||
io::network::Endpoint endpoint{FLAGS_address, port};
|
||||
endpoints.push_back(endpoint);
|
||||
}
|
||||
|
||||
// Populate the db (random graph with given number of nodes and edges)
|
||||
// Raft reelection constants are between 300ms to 500ms, so we
|
||||
// use 1000ms to avoid using up all retries unnecessarily during reelection.
|
||||
std::chrono::milliseconds retry_delay(1000);
|
||||
communication::ClientContext context(FLAGS_use_ssl);
|
||||
communication::bolt::HAClient client(endpoints, &context, FLAGS_username,
|
||||
FLAGS_password, 10, retry_delay);
|
||||
|
||||
for (int i = 0; i < FLAGS_nodes; ++i) {
|
||||
client.Execute("CREATE (:Node {id:" + std::to_string(i) + "})", {});
|
||||
}
|
||||
|
||||
auto seed =
|
||||
std::chrono::high_resolution_clock::now().time_since_epoch().count();
|
||||
std::mt19937 rng(seed);
|
||||
std::uniform_int_distribution<int> dist(0, FLAGS_nodes - 1);
|
||||
|
||||
for (int i = 0; i < FLAGS_edges; ++i) {
|
||||
int a = dist(rng), b = dist(rng);
|
||||
client.Execute("MATCH (n {id:" + std::to_string(a) + "})," +
|
||||
" (m {id:" + std::to_string(b) + "})" +
|
||||
"CREATE (n)-[:Edge]->(m);", {});
|
||||
}
|
||||
|
||||
const int num_threads = std::thread::hardware_concurrency();
|
||||
std::vector<std::thread> threads;
|
||||
std::vector<double> thread_duration;
|
||||
threads.reserve(num_threads);
|
||||
thread_duration.resize(num_threads);
|
||||
|
||||
for (int i = 0; i < num_threads; ++i) {
|
||||
threads.emplace_back([i, &endpoints, &query_counter, retry_delay,
|
||||
&local_duration = thread_duration[i]]() {
|
||||
utils::ThreadSetName(fmt::format("BenchWriter{}", i));
|
||||
communication::ClientContext context(FLAGS_use_ssl);
|
||||
communication::bolt::HAClient client(endpoints, &context, FLAGS_username,
|
||||
FLAGS_password, 10, retry_delay);
|
||||
|
||||
auto seed =
|
||||
std::chrono::high_resolution_clock::now().time_since_epoch().count();
|
||||
std::mt19937 rng(seed);
|
||||
std::uniform_int_distribution<int> dist(0, FLAGS_nodes - 1);
|
||||
|
||||
utils::Timer t;
|
||||
while (true) {
|
||||
local_duration = t.Elapsed().count();
|
||||
if (local_duration >= FLAGS_duration) break;
|
||||
int id = dist(rng);
|
||||
|
||||
try {
|
||||
client.Execute("MATCH (n {id:" + std::to_string(id) +
|
||||
"})-[e]->(m) RETURN e, m;", {});
|
||||
query_counter.fetch_add(1);
|
||||
} catch (const communication::bolt::ClientQueryException &e) {
|
||||
LOG(WARNING) << e.what();
|
||||
break;
|
||||
} catch (const communication::bolt::ClientFatalException &e) {
|
||||
LOG(WARNING) << e.what();
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
for (auto &t : threads) {
|
||||
if (t.joinable()) t.join();
|
||||
}
|
||||
|
||||
double duration = 0;
|
||||
for (auto &d : thread_duration) duration += d;
|
||||
duration /= num_threads;
|
||||
|
||||
double read_per_second = query_counter / duration;
|
||||
|
||||
std::ofstream output(FLAGS_output_file);
|
||||
output << "duration " << duration << std::endl;
|
||||
output << "executed_reads " << query_counter << std::endl;
|
||||
output << "read_per_second " << read_per_second << std::endl;
|
||||
output.close();
|
||||
|
||||
return 0;
|
||||
}
|
@ -1,5 +0,0 @@
|
||||
[
|
||||
[1, "127.0.0.1", 10000],
|
||||
[2, "127.0.0.1", 10001],
|
||||
[3, "127.0.0.1", 10002]
|
||||
]
|
@ -1,7 +0,0 @@
|
||||
{
|
||||
"election_timeout_min": 350,
|
||||
"election_timeout_max": 700,
|
||||
"heartbeat_interval": 100,
|
||||
"replication_timeout": 10000,
|
||||
"log_size_snapshot_threshold": -1
|
||||
}
|
@ -1,96 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
## Helper functions
|
||||
|
||||
function wait_for_server {
|
||||
port=$1
|
||||
while ! nc -z -w 1 127.0.0.1 $port; do
|
||||
sleep 0.1
|
||||
done
|
||||
sleep 1
|
||||
}
|
||||
|
||||
function echo_info { printf "\033[1;36m~~ $1 ~~\033[0m\n"; }
|
||||
function echo_success { printf "\033[1;32m~~ $1 ~~\033[0m\n\n"; }
|
||||
function echo_failure { printf "\033[1;31m~~ $1 ~~\033[0m\n\n"; }
|
||||
|
||||
## Environment setup
|
||||
|
||||
# Get script location.
|
||||
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
|
||||
cd "$DIR"
|
||||
|
||||
# Find memgraph binaries.
|
||||
binary_dir="$DIR/../../../../build"
|
||||
|
||||
# Results for apollo
|
||||
RESULTS="$DIR/.apollo_measurements"
|
||||
|
||||
# Benchmark parameters
|
||||
DURATION=10
|
||||
|
||||
# Startup
|
||||
declare -a HA_PIDS
|
||||
declare -a exit_codes
|
||||
|
||||
for server_id in 1 2 3
|
||||
do
|
||||
$binary_dir/memgraph_ha --server_id $server_id \
|
||||
--coordination_config_file="coordination.json" \
|
||||
--raft_config_file="raft.json" \
|
||||
--bolt-port $((7686 + $server_id)) \
|
||||
--db-recover-on-startup=false \
|
||||
--durability_directory=dur$server_id &
|
||||
HA_PIDS[$server_id]=$!
|
||||
wait_for_server $((7686 + $server_id))
|
||||
done
|
||||
|
||||
# Allow some time for leader election.
|
||||
sleep 10
|
||||
|
||||
# Start the memgraph process and wait for it to start.
|
||||
echo_info "Starting HA read benchmark"
|
||||
$binary_dir/tests/feature_benchmark/ha/read/benchmark \
|
||||
--duration=$DURATION \
|
||||
--output-file=$RESULTS &
|
||||
pid=$!
|
||||
|
||||
wait -n $pid
|
||||
exit_codes[0]=$?
|
||||
|
||||
# Shutdown
|
||||
for server_id in 1 2 3
|
||||
do
|
||||
kill -15 ${HA_PIDS[$server_id]}
|
||||
done
|
||||
|
||||
# Cleanup
|
||||
for server_id in 1 2 3
|
||||
do
|
||||
wait -n ${HA_PIDS[$server_id]}
|
||||
exit_codes[$server_id]=$?
|
||||
rm -r dur$server_id
|
||||
done
|
||||
|
||||
failure=0
|
||||
|
||||
for i in `seq 0 3`; do
|
||||
code=${exit_codes[$i]}
|
||||
if [ $code -ne 0 ]; then
|
||||
if [ $i -eq 0 ]; then
|
||||
echo_failure "Benchmark exited with status $code"
|
||||
else
|
||||
echo_failure "Memgraph HA server $i exited with status $code"
|
||||
fi
|
||||
|
||||
failure=1
|
||||
fi
|
||||
done
|
||||
|
||||
if [ $failure -eq 0 ]; then
|
||||
echo_success "Benchmark finished successfully"
|
||||
else
|
||||
echo_failure "Benchmark didn't finish successfully"
|
||||
fi
|
||||
|
||||
exit $failure
|
@ -1,6 +0,0 @@
|
||||
set(target_name memgraph__feature_benchmark__ha__write)
|
||||
|
||||
set(benchmark_target_name ${target_name}__benchmark)
|
||||
add_executable(${benchmark_target_name} benchmark.cpp)
|
||||
set_target_properties(${benchmark_target_name} PROPERTIES OUTPUT_NAME benchmark)
|
||||
target_link_libraries(${benchmark_target_name} mg-utils mg-communication)
|
@ -1,94 +0,0 @@
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <fstream>
|
||||
#include <optional>
|
||||
#include <thread>
|
||||
|
||||
#include <fmt/format.h>
|
||||
#include <gflags/gflags.h>
|
||||
|
||||
#include "communication/bolt/ha_client.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "io/network/utils.hpp"
|
||||
#include "utils/flag_validation.hpp"
|
||||
#include "utils/thread.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
using namespace std::literals::chrono_literals;
|
||||
|
||||
DEFINE_string(address, "127.0.0.1", "Server address");
|
||||
DEFINE_int32(port, 7687, "Server port");
|
||||
DEFINE_int32(cluster_size, 3, "Size of the raft cluster.");
|
||||
DEFINE_string(username, "", "Username for the database");
|
||||
DEFINE_string(password, "", "Password for the database");
|
||||
DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server.");
|
||||
DEFINE_double(duration, 10.0,
|
||||
"How long should the client perform writes (seconds)");
|
||||
DEFINE_string(output_file, "", "Output file where the results should be.");
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||
google::SetUsageMessage("Memgraph HA write benchmark client");
|
||||
google::InitGoogleLogging(argv[0]);
|
||||
|
||||
std::atomic<int64_t> query_counter{0};
|
||||
|
||||
std::vector<io::network::Endpoint> endpoints;
|
||||
for (int i = 0; i < FLAGS_cluster_size; ++i) {
|
||||
uint16_t port = FLAGS_port + i;
|
||||
io::network::Endpoint endpoint{FLAGS_address, port};
|
||||
endpoints.push_back(endpoint);
|
||||
}
|
||||
|
||||
const int num_threads = std::thread::hardware_concurrency();
|
||||
std::vector<std::thread> threads;
|
||||
std::vector<double> thread_duration;
|
||||
threads.reserve(num_threads);
|
||||
thread_duration.resize(num_threads);
|
||||
|
||||
for (int i = 0; i < num_threads; ++i) {
|
||||
threads.emplace_back([i, endpoints = endpoints, &query_counter,
|
||||
&local_duration = thread_duration[i]]() {
|
||||
utils::ThreadSetName(fmt::format("BenchWriter{}", i));
|
||||
communication::ClientContext context(FLAGS_use_ssl);
|
||||
std::chrono::milliseconds retry_delay(1000);
|
||||
communication::bolt::HAClient client(endpoints, &context, FLAGS_username,
|
||||
FLAGS_password, 10, retry_delay);
|
||||
|
||||
utils::Timer t;
|
||||
while (true) {
|
||||
local_duration = t.Elapsed().count();
|
||||
if (local_duration >= FLAGS_duration) break;
|
||||
|
||||
try {
|
||||
client.Execute("CREATE (:Node)", {});
|
||||
query_counter.fetch_add(1);
|
||||
} catch (const communication::bolt::ClientQueryException &e) {
|
||||
LOG(WARNING) << e.what();
|
||||
break;
|
||||
} catch (const communication::bolt::ClientFatalException &e) {
|
||||
LOG(WARNING) << e.what();
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
for (auto &t : threads) {
|
||||
if (t.joinable()) t.join();
|
||||
}
|
||||
|
||||
double duration = 0;
|
||||
for (auto &d : thread_duration) duration += d;
|
||||
duration /= num_threads;
|
||||
|
||||
double write_per_second = query_counter / duration;
|
||||
|
||||
std::ofstream output(FLAGS_output_file);
|
||||
output << "duration " << duration << std::endl;
|
||||
output << "executed_writes " << query_counter << std::endl;
|
||||
output << "write_per_second " << write_per_second << std::endl;
|
||||
output.close();
|
||||
|
||||
return 0;
|
||||
}
|
@ -1,5 +0,0 @@
|
||||
[
|
||||
[1, "127.0.0.1", 10000],
|
||||
[2, "127.0.0.1", 10001],
|
||||
[3, "127.0.0.1", 10002]
|
||||
]
|
@ -1,7 +0,0 @@
|
||||
{
|
||||
"election_timeout_min": 350,
|
||||
"election_timeout_max": 700,
|
||||
"heartbeat_interval": 100,
|
||||
"replication_timeout": 10000,
|
||||
"log_size_snapshot_threshold": -1
|
||||
}
|
@ -1,96 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
## Helper functions
|
||||
|
||||
function wait_for_server {
|
||||
port=$1
|
||||
while ! nc -z -w 1 127.0.0.1 $port; do
|
||||
sleep 0.1
|
||||
done
|
||||
sleep 1
|
||||
}
|
||||
|
||||
function echo_info { printf "\033[1;36m~~ $1 ~~\033[0m\n"; }
|
||||
function echo_success { printf "\033[1;32m~~ $1 ~~\033[0m\n\n"; }
|
||||
function echo_failure { printf "\033[1;31m~~ $1 ~~\033[0m\n\n"; }
|
||||
|
||||
## Environment setup
|
||||
|
||||
# Get script location.
|
||||
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
|
||||
cd "$DIR"
|
||||
|
||||
# Find memgraph binaries.
|
||||
binary_dir="$DIR/../../../../build"
|
||||
|
||||
# Results for apollo
|
||||
RESULTS="$DIR/.apollo_measurements"
|
||||
|
||||
# Benchmark parameters
|
||||
DURATION=10
|
||||
|
||||
# Startup
|
||||
declare -a HA_PIDS
|
||||
declare -a exit_codes
|
||||
|
||||
for server_id in 1 2 3
|
||||
do
|
||||
$binary_dir/memgraph_ha --server_id $server_id \
|
||||
--coordination_config_file="coordination.json" \
|
||||
--raft_config_file="raft.json" \
|
||||
--bolt-port $((7686 + $server_id)) \
|
||||
--db-recover-on-startup=false \
|
||||
--durability_directory=dur$server_id &
|
||||
HA_PIDS[$server_id]=$!
|
||||
wait_for_server $((7686 + $server_id))
|
||||
done
|
||||
|
||||
# Allow some time for leader election.
|
||||
sleep 10
|
||||
|
||||
# Start the memgraph process and wait for it to start.
|
||||
echo_info "Starting HA write benchmark"
|
||||
$binary_dir/tests/feature_benchmark/ha/write/benchmark \
|
||||
--duration=$DURATION \
|
||||
--output-file=$RESULTS &
|
||||
pid=$!
|
||||
|
||||
wait -n $pid
|
||||
exit_codes[0]=$?
|
||||
|
||||
# Shutdown
|
||||
for server_id in 1 2 3
|
||||
do
|
||||
kill -15 ${HA_PIDS[$server_id]}
|
||||
done
|
||||
|
||||
# Cleanup
|
||||
for server_id in 1 2 3
|
||||
do
|
||||
wait -n ${HA_PIDS[$server_id]}
|
||||
exit_codes[$server_id]=$?
|
||||
rm -r dur$server_id
|
||||
done
|
||||
|
||||
failure=0
|
||||
|
||||
for i in `seq 0 3`; do
|
||||
code=${exit_codes[$i]}
|
||||
if [ $code -ne 0 ]; then
|
||||
if [ $i -eq 0 ]; then
|
||||
echo_failure "Benchmark exited with status $code"
|
||||
else
|
||||
echo_failure "Memgraph HA server $i exited with status $code"
|
||||
fi
|
||||
|
||||
failure=1
|
||||
fi
|
||||
done
|
||||
|
||||
if [ $failure -eq 0 ]; then
|
||||
echo_success "Benchmark finished successfully"
|
||||
else
|
||||
echo_failure "Benchmark didn't finish successfully"
|
||||
fi
|
||||
|
||||
exit $failure
|
@ -1,6 +0,0 @@
|
||||
set(target_name memgraph__integration__ha_basic)
|
||||
set(tester_target_name ${target_name}__tester)
|
||||
|
||||
add_executable(${tester_target_name} tester.cpp)
|
||||
set_target_properties(${tester_target_name} PROPERTIES OUTPUT_NAME tester)
|
||||
target_link_libraries(${tester_target_name} mg-utils mg-communication)
|
@ -1,7 +0,0 @@
|
||||
{
|
||||
"election_timeout_min": 750,
|
||||
"election_timeout_max": 1000,
|
||||
"heartbeat_interval": 100,
|
||||
"replication_timeout": 10000,
|
||||
"log_size_snapshot_threshold": -1
|
||||
}
|
@ -1,135 +0,0 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
"""
|
||||
This test checks the the basic functionality of HA Memgraph. It incorporates
|
||||
both leader election and log replication processes.
|
||||
|
||||
The test proceeds as follows for clusters of size 3 and 5:
|
||||
1) Start the whole cluster
|
||||
2) Kill random workers but leave the majority alive
|
||||
3) Create a single Node
|
||||
4) Bring dead nodes back to life
|
||||
5) Kill random workers but leave the majority alive
|
||||
6) Check if everything is ok with DB state
|
||||
7) GOTO 1) and repeat 25 times
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import time
|
||||
import subprocess
|
||||
import sys
|
||||
import random
|
||||
|
||||
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
||||
PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", "..", ".."))
|
||||
|
||||
# append parent directory
|
||||
sys.path.append(os.path.join(SCRIPT_DIR, ".."))
|
||||
|
||||
from ha_test import HaTestBase
|
||||
|
||||
|
||||
class HaBasicTest(HaTestBase):
|
||||
def execute_step(self, step, node_count):
|
||||
if step == "create":
|
||||
print("Executing create query")
|
||||
client = subprocess.Popen([self.tester_binary,
|
||||
"--step", "create",
|
||||
"--cluster-size", str(self.cluster_size),
|
||||
"--node-count", str(node_count)])
|
||||
elif step == "count":
|
||||
print("Executing count query")
|
||||
client = subprocess.Popen([self.tester_binary,
|
||||
"--step", "count",
|
||||
"--cluster_size", str(self.cluster_size),
|
||||
"--node-count", str(node_count)])
|
||||
else:
|
||||
return 0
|
||||
|
||||
# Check what happened with query execution.
|
||||
try:
|
||||
code = client.wait(timeout=30)
|
||||
except subprocess.TimeoutExpired as e:
|
||||
print("HA client timed out!")
|
||||
client.kill()
|
||||
return 1
|
||||
|
||||
return code
|
||||
|
||||
|
||||
def start_workers(self, worker_ids):
|
||||
for wid in worker_ids:
|
||||
print("Starting worker {}".format(wid + 1))
|
||||
self.start_worker(wid)
|
||||
|
||||
|
||||
def kill_workers(self, worker_ids):
|
||||
for wid in worker_ids:
|
||||
print("Killing worker {}".format(wid + 1))
|
||||
self.kill_worker(wid)
|
||||
|
||||
|
||||
def execute(self):
|
||||
self.start_cluster()
|
||||
|
||||
expected_results = 0
|
||||
|
||||
# Make sure at least one node exists.
|
||||
assert self.execute_step("create", expected_results) == 0, \
|
||||
"Error while executing create query"
|
||||
expected_results = 1
|
||||
|
||||
for i in range(20):
|
||||
# Create step
|
||||
partition = random.sample(range(self.cluster_size),
|
||||
random.randint(0, int((self.cluster_size - 1) / 2)))
|
||||
|
||||
self.kill_workers(partition)
|
||||
|
||||
assert self.execute_step("create", expected_results) == 0, \
|
||||
"Error while executing create query"
|
||||
expected_results += 1
|
||||
|
||||
self.start_workers(partition)
|
||||
|
||||
# Check step
|
||||
partition = random.sample(range(self.cluster_size),
|
||||
random.randint(0, int((self.cluster_size - 1) / 2)))
|
||||
|
||||
self.kill_workers(partition)
|
||||
|
||||
assert self.execute_step("count", expected_results) == 0, \
|
||||
"Error while executing count query"
|
||||
|
||||
self.start_workers(partition)
|
||||
|
||||
# Check that no data was lost.
|
||||
assert self.execute_step("count", expected_results) == 0, \
|
||||
"Error while executing count query"
|
||||
|
||||
|
||||
def find_correct_path(path):
|
||||
return os.path.join(PROJECT_DIR, "build", path)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
memgraph_binary = find_correct_path("memgraph_ha")
|
||||
tester_binary = find_correct_path(os.path.join("tests", "integration", "ha",
|
||||
"basic", "tester"))
|
||||
|
||||
raft_config_file = os.path.join(PROJECT_DIR, "tests", "integration", "ha",
|
||||
"basic", "raft.json")
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--memgraph", default=memgraph_binary)
|
||||
parser.add_argument("--raft_config_file", default=raft_config_file)
|
||||
args = parser.parse_args()
|
||||
|
||||
for cluster_size in [3, 5]:
|
||||
print("\033[1;36m~~ Executing test with cluster size: %d~~\033[0m" % (cluster_size))
|
||||
HaBasicTest(
|
||||
args.memgraph, tester_binary, args.raft_config_file, cluster_size)
|
||||
print("\033[1;32m~~ The test finished successfully ~~\033[0m")
|
||||
|
||||
sys.exit(0)
|
@ -1,67 +0,0 @@
|
||||
#include <chrono>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
#include <gflags/gflags.h>
|
||||
#include <glog/logging.h>
|
||||
|
||||
#include "communication/bolt/ha_client.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "io/network/utils.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
DEFINE_string(address, "127.0.0.1", "Server address");
|
||||
DEFINE_int32(port, 7687, "Server port");
|
||||
DEFINE_int32(cluster_size, 3, "Size of the raft cluster.");
|
||||
DEFINE_int32(node_count, -1, "Expected number of nodes in the database.");
|
||||
DEFINE_string(username, "", "Username for the database");
|
||||
DEFINE_string(password, "", "Password for the database");
|
||||
DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server.");
|
||||
DEFINE_string(step, "", "The step to execute (available: create, count)");
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||
google::InitGoogleLogging(argv[0]);
|
||||
|
||||
communication::SSLInit sslInit;
|
||||
|
||||
try {
|
||||
std::vector<io::network::Endpoint> endpoints(FLAGS_cluster_size);
|
||||
for (int i = 0; i < FLAGS_cluster_size; ++i)
|
||||
endpoints[i] = io::network::Endpoint(FLAGS_address, FLAGS_port + i);
|
||||
|
||||
std::chrono::milliseconds retry_delay(1000);
|
||||
communication::ClientContext context(FLAGS_use_ssl);
|
||||
communication::bolt::HAClient client(endpoints, &context, FLAGS_username,
|
||||
FLAGS_password, 25, retry_delay);
|
||||
|
||||
if (FLAGS_step == "create") {
|
||||
client.Execute("create (:Node {id: $id})",
|
||||
{{"id", FLAGS_node_count + 1}});
|
||||
return 0;
|
||||
} else if (FLAGS_step == "count") {
|
||||
auto result = client.Execute("match (n) return n", {});
|
||||
if (result.records.size() != FLAGS_node_count) {
|
||||
LOG(WARNING) << "Missing data: expected " << FLAGS_node_count
|
||||
<< ", got " << result.records.size();
|
||||
return 2;
|
||||
}
|
||||
return 0;
|
||||
} else {
|
||||
LOG(FATAL) << "Unexpected client step!";
|
||||
}
|
||||
} catch (const communication::bolt::ClientQueryException &e) {
|
||||
LOG(WARNING)
|
||||
<< "Transient error while executing query. (eg. mistyped query, etc.)\n"
|
||||
<< e.what();
|
||||
} catch (const communication::bolt::ClientFatalException &e) {
|
||||
LOG(WARNING) << "Couldn't connect to server\n" << e.what();
|
||||
} catch (const utils::BasicException &e) {
|
||||
LOG(WARNING) << "Error while executing query\n" << e.what();
|
||||
}
|
||||
|
||||
// The test wasn't successfull
|
||||
return 1;
|
||||
}
|
@ -1,6 +0,0 @@
|
||||
set(target_name memgraph__integration__ha_constraints)
|
||||
set(tester_target_name ${target_name}__tester)
|
||||
|
||||
add_executable(${tester_target_name} tester.cpp)
|
||||
set_target_properties(${tester_target_name} PROPERTIES OUTPUT_NAME tester)
|
||||
target_link_libraries(${tester_target_name} mg-utils mg-communication)
|
@ -1,7 +0,0 @@
|
||||
{
|
||||
"election_timeout_min": 200,
|
||||
"election_timeout_max": 500,
|
||||
"heartbeat_interval": 100,
|
||||
"replication_timeout": 10000,
|
||||
"log_size_snapshot_threshold": -1
|
||||
}
|
@ -1,119 +0,0 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import time
|
||||
import random
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
||||
PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", "..", ".."))
|
||||
|
||||
# append parent directory
|
||||
sys.path.append(os.path.join(SCRIPT_DIR, ".."))
|
||||
|
||||
from ha_test import HaTestBase
|
||||
|
||||
|
||||
class HaIndexTest(HaTestBase):
|
||||
def execute_step(self, step, property_value=None, expected_status=None,
|
||||
expected_result=None):
|
||||
if step == "create":
|
||||
print("Executing create query")
|
||||
client = subprocess.Popen([self.tester_binary, "--step", "create",
|
||||
"--cluster_size", str(self.cluster_size)])
|
||||
|
||||
elif step == "drop":
|
||||
print("Executing drop query")
|
||||
client = subprocess.Popen([self.tester_binary, "--step", "drop",
|
||||
"--cluster_size", str(self.cluster_size)])
|
||||
elif step == "add_node":
|
||||
print("Executing add_node query ")
|
||||
client = subprocess.Popen([self.tester_binary, "--step", "add_node",
|
||||
"--cluster_size", str(self.cluster_size), "--expected_status",
|
||||
str(expected_status), "--property_value", str(property_value)])
|
||||
|
||||
elif step == "check":
|
||||
print("Executing check query")
|
||||
client = subprocess.Popen([self.tester_binary, "--step", "check",
|
||||
"--cluster_size", str(self.cluster_size), "--expected_result",
|
||||
str(expected_result)])
|
||||
else:
|
||||
raise ValueError("Invalid step argument: " + step)
|
||||
|
||||
# Check what happened with query execution.
|
||||
try:
|
||||
code = client.wait(timeout=30)
|
||||
except subprocess.TimeoutExpired as e:
|
||||
print("HA client timed out!")
|
||||
client.kill()
|
||||
return 1
|
||||
|
||||
return code
|
||||
|
||||
|
||||
def execute(self):
|
||||
self.start_cluster()
|
||||
num_nodes = 1
|
||||
|
||||
assert self.execute_step("add_node", expected_status=0, \
|
||||
property_value=num_nodes) == 0, \
|
||||
"Error while executing add_node query"
|
||||
|
||||
assert self.execute_step("create") == 0, \
|
||||
"Error while executing create query"
|
||||
|
||||
assert self.execute_step("check", expected_result=num_nodes) == 0, \
|
||||
"Error while executing check query"
|
||||
|
||||
for i in range(self.cluster_size):
|
||||
# Kill worker.
|
||||
print("Killing worker {}".format(i))
|
||||
self.kill_worker(i)
|
||||
|
||||
assert self.execute_step("add_node", expected_status=1, \
|
||||
property_value=num_nodes) == 0, \
|
||||
"Error while executing add_node query"
|
||||
|
||||
assert self.execute_step("add_node", expected_status=0, \
|
||||
property_value=num_nodes + 1) == 0, \
|
||||
"Error while executing add_node query"
|
||||
|
||||
num_nodes += 1
|
||||
|
||||
# Bring worker back to life.
|
||||
print("Starting worker {}".format(i))
|
||||
self.start_worker(i)
|
||||
|
||||
assert self.execute_step("drop") == 0, \
|
||||
"Error while executing drop query"
|
||||
|
||||
assert self.execute_step("check", expected_result=num_nodes) == 0, \
|
||||
"Error while executing check query"
|
||||
|
||||
|
||||
def find_correct_path(path):
|
||||
return os.path.join(PROJECT_DIR, "build", path)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
memgraph_binary = find_correct_path("memgraph_ha")
|
||||
tester_binary = find_correct_path(os.path.join("tests", "integration", "ha",
|
||||
"constraints", "tester"))
|
||||
|
||||
raft_config_file = os.path.join(PROJECT_DIR, "tests", "integration", "ha",
|
||||
"constraints", "raft.json")
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--memgraph", default=memgraph_binary)
|
||||
parser.add_argument("--raft_config_file", default=raft_config_file)
|
||||
args = parser.parse_args()
|
||||
|
||||
for cluster_size in [3, 5]:
|
||||
print("\033[1;36m~~ Executing test with cluster size: %d~~\033[0m" % (cluster_size))
|
||||
HaIndexTest(
|
||||
args.memgraph, tester_binary, args.raft_config_file, cluster_size)
|
||||
print("\033[1;32m~~ The test finished successfully ~~\033[0m")
|
||||
|
||||
sys.exit(0)
|
@ -1,99 +0,0 @@
|
||||
#include <chrono>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
#include <fmt/format.h>
|
||||
#include <gflags/gflags.h>
|
||||
#include <glog/logging.h>
|
||||
|
||||
#include "communication/bolt/ha_client.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "io/network/utils.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
DEFINE_string(address, "127.0.0.1", "Server address");
|
||||
DEFINE_int32(port, 7687, "Server port");
|
||||
DEFINE_int32(cluster_size, 3, "Size of the raft cluster.");
|
||||
DEFINE_int32(num_retries, 20, "Number of (leader) execution retries.");
|
||||
DEFINE_string(username, "", "Username for the database");
|
||||
DEFINE_string(password, "", "Password for the database");
|
||||
DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server.");
|
||||
|
||||
DEFINE_string(step, "",
|
||||
"The step to execute (available: create, check, add_node, drop");
|
||||
DEFINE_int32(property_value, 0, "Value of the property when creating a node.");
|
||||
DEFINE_int32(
|
||||
expected_status, 0,
|
||||
"Expected query execution status when creating a node, 0 is success");
|
||||
DEFINE_int32(expected_result, 0, "Expected query result");
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||
google::InitGoogleLogging(argv[0]);
|
||||
|
||||
communication::SSLInit sslInit;
|
||||
|
||||
try {
|
||||
std::vector<io::network::Endpoint> endpoints;
|
||||
for (int i = 0; i < FLAGS_cluster_size; ++i) {
|
||||
uint16_t port = FLAGS_port + i;
|
||||
io::network::Endpoint endpoint{FLAGS_address, port};
|
||||
endpoints.push_back(endpoint);
|
||||
}
|
||||
|
||||
std::chrono::milliseconds retry_delay(1000);
|
||||
communication::ClientContext context(FLAGS_use_ssl);
|
||||
communication::bolt::HAClient client(endpoints, &context, FLAGS_username,
|
||||
FLAGS_password, FLAGS_num_retries,
|
||||
retry_delay);
|
||||
|
||||
if (FLAGS_step == "create") {
|
||||
client.Execute("create constraint on (n:Node) assert n.prop is unique",
|
||||
{});
|
||||
return 0;
|
||||
} else if (FLAGS_step == "drop") {
|
||||
client.Execute("drop constraint on (n:Node) assert n.prop is unique", {});
|
||||
return 0;
|
||||
} else if (FLAGS_step == "add_node") {
|
||||
client.Execute(
|
||||
fmt::format("create (:Node{{prop:{}}})", FLAGS_property_value), {});
|
||||
|
||||
if (FLAGS_expected_status == 0) {
|
||||
return 0;
|
||||
} else {
|
||||
LOG(WARNING) << "Query execution should've fail but it didn't.";
|
||||
}
|
||||
|
||||
} else if (FLAGS_step == "check") {
|
||||
auto result = client.Execute("match (n) return n", {});
|
||||
|
||||
if (result.records.size() != FLAGS_expected_result) {
|
||||
LOG(WARNING) << "Unexpected number of nodes: "
|
||||
<< "expected " << FLAGS_expected_result << ", got "
|
||||
<< result.records.size();
|
||||
return 2;
|
||||
}
|
||||
return 0;
|
||||
|
||||
} else {
|
||||
LOG(FATAL) << "Unexpected client step!";
|
||||
}
|
||||
} catch (const communication::bolt::ClientQueryException &e) {
|
||||
// Sometimes we expect the query to fail, so we need to handle this as
|
||||
// success.
|
||||
if (FLAGS_expected_status == 0) {
|
||||
LOG(WARNING) << "There was some transient error during query execution.";
|
||||
} else {
|
||||
LOG(INFO) << "Query execution failed as expected, message: " << e.what();
|
||||
return 0;
|
||||
}
|
||||
} catch (const communication::bolt::ClientFatalException &) {
|
||||
LOG(WARNING) << "Failed to communicate with the leader.";
|
||||
} catch (const utils::BasicException &) {
|
||||
LOG(WARNING) << "Error while executing query.";
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
@ -1,120 +0,0 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import tempfile
|
||||
import time
|
||||
|
||||
|
||||
class HaTestBase:
|
||||
def __init__(self, memgraph_binary, tester_binary, raft_config_file,
|
||||
cluster_size):
|
||||
|
||||
self.workers = [None for worker in range(cluster_size)]
|
||||
self.memgraph_binary = memgraph_binary
|
||||
self.tester_binary = tester_binary
|
||||
self.raft_config_file = raft_config_file
|
||||
self.cluster_size = cluster_size
|
||||
|
||||
# Get a temporary directory used for durability.
|
||||
self._tempdir = tempfile.TemporaryDirectory()
|
||||
|
||||
# generate coordination config file
|
||||
self.coordination_config_file = tempfile.NamedTemporaryFile()
|
||||
coordination = self._generate_json_coordination_config()
|
||||
self.coordination_config_file.write(bytes(coordination, "UTF-8"))
|
||||
self.coordination_config_file.flush()
|
||||
|
||||
self.execute()
|
||||
|
||||
|
||||
def __del__(self):
|
||||
self.destroy_cluster()
|
||||
|
||||
|
||||
def start_cluster(self):
|
||||
for worker_id in range(self.cluster_size):
|
||||
self.start_worker(worker_id)
|
||||
|
||||
# allow some time for leader election
|
||||
time.sleep(5)
|
||||
|
||||
|
||||
def destroy_cluster(self):
|
||||
for worker in self.workers:
|
||||
if worker is None: continue
|
||||
worker.kill()
|
||||
worker.wait()
|
||||
self.workers.clear()
|
||||
self.coordination_config_file.close()
|
||||
|
||||
|
||||
def kill_worker(self, worker_id):
|
||||
assert worker_id >= 0 and worker_id < self.cluster_size, \
|
||||
"Invalid worker ID {}".format(worker_id)
|
||||
assert self.workers[worker_id] is not None, \
|
||||
"Worker {} doesn't exists".format(worker_id)
|
||||
|
||||
self.workers[worker_id].kill()
|
||||
self.workers[worker_id].wait()
|
||||
self.workers[worker_id] = None
|
||||
|
||||
|
||||
def start_worker(self, worker_id):
|
||||
assert worker_id >= 0 and worker_id < self.cluster_size, \
|
||||
"Invalid worker ID {}".format(worker_id)
|
||||
assert self.workers[worker_id] is None, \
|
||||
"Worker already exists".format(worker_id)
|
||||
|
||||
self.workers[worker_id] = subprocess.Popen(self._generate_args(worker_id))
|
||||
|
||||
time.sleep(0.2)
|
||||
assert self.workers[worker_id].poll() is None, \
|
||||
"Worker{} process died prematurely!".format(worker_id)
|
||||
|
||||
self._wait_for_server(7687 + worker_id)
|
||||
|
||||
|
||||
def is_worker_alive(self, worker_id):
|
||||
assert worker_id >= 0 and worker_id < self.cluster_size, \
|
||||
"Invalid worker ID {}".format(worker_id)
|
||||
return self.workers[worker_id] is None or \
|
||||
self.workers[worker_id].poll() is None
|
||||
|
||||
|
||||
def execute(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
def _wait_for_server(self, port, delay=0.1):
|
||||
cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)]
|
||||
while subprocess.call(cmd) != 0:
|
||||
time.sleep(delay)
|
||||
time.sleep(delay)
|
||||
|
||||
|
||||
def get_durability_directory(self, worker_id):
|
||||
return os.path.join(self._tempdir.name, "worker" + str(worker_id))
|
||||
|
||||
|
||||
def _generate_args(self, worker_id):
|
||||
args = [self.memgraph_binary]
|
||||
args.extend(["--server_id", str(worker_id + 1)])
|
||||
args.extend(["--bolt-port", str(7687 + worker_id)])
|
||||
args.extend(["--raft_config_file", self.raft_config_file])
|
||||
args.extend(["--coordination_config_file",
|
||||
self.coordination_config_file.name])
|
||||
|
||||
# Each worker must have a unique durability directory.
|
||||
args.extend(["--durability_directory",
|
||||
self.get_durability_directory(worker_id)])
|
||||
return args
|
||||
|
||||
|
||||
def _generate_json_coordination_config(self):
|
||||
data = []
|
||||
for i in range(self.cluster_size):
|
||||
data.append([i + 1, "127.0.0.1", 10000 + i])
|
||||
return json.dumps(data)
|
||||
|
@ -1,6 +0,0 @@
|
||||
set(target_name memgraph__integration__ha_index)
|
||||
set(tester_target_name ${target_name}__tester)
|
||||
|
||||
add_executable(${tester_target_name} tester.cpp)
|
||||
set_target_properties(${tester_target_name} PROPERTIES OUTPUT_NAME tester)
|
||||
target_link_libraries(${tester_target_name} mg-utils mg-communication)
|
@ -1,7 +0,0 @@
|
||||
{
|
||||
"election_timeout_min": 200,
|
||||
"election_timeout_max": 500,
|
||||
"heartbeat_interval": 100,
|
||||
"replication_timeout": 10000,
|
||||
"log_size_snapshot_threshold": -1
|
||||
}
|
@ -1,86 +0,0 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import time
|
||||
import random
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
||||
PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", "..", ".."))
|
||||
|
||||
# append parent directory
|
||||
sys.path.append(os.path.join(SCRIPT_DIR, ".."))
|
||||
|
||||
from ha_test import HaTestBase
|
||||
|
||||
|
||||
class HaIndexTest(HaTestBase):
|
||||
def execute_step(self, step):
|
||||
if step == "create":
|
||||
print("Executing create query")
|
||||
client = subprocess.Popen([self.tester_binary, "--step", "create",
|
||||
"--cluster_size", str(self.cluster_size)])
|
||||
|
||||
elif step == "check":
|
||||
print("Executing check query")
|
||||
client = subprocess.Popen([self.tester_binary, "--step", "check",
|
||||
"--cluster_size", str(self.cluster_size)])
|
||||
else:
|
||||
return 0
|
||||
|
||||
# Check what happened with query execution.
|
||||
try:
|
||||
code = client.wait(timeout=30)
|
||||
except subprocess.TimeoutExpired as e:
|
||||
print("HA client timed out!")
|
||||
client.kill()
|
||||
return 1
|
||||
|
||||
return code
|
||||
|
||||
|
||||
def execute(self):
|
||||
self.start_cluster()
|
||||
|
||||
assert self.execute_step("create") == 0, \
|
||||
"Error while executing create query"
|
||||
|
||||
for i in range(self.cluster_size):
|
||||
# Kill worker.
|
||||
print("Killing worker {}".format(i + 1))
|
||||
self.kill_worker(i)
|
||||
|
||||
assert self.execute_step("check") == 0, \
|
||||
"Error while executing check query"
|
||||
|
||||
# Bring worker back to life.
|
||||
print("Starting worker {}".format(i + 1))
|
||||
self.start_worker(i)
|
||||
|
||||
|
||||
def find_correct_path(path):
|
||||
return os.path.join(PROJECT_DIR, "build", path)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
memgraph_binary = find_correct_path("memgraph_ha")
|
||||
tester_binary = find_correct_path(os.path.join("tests", "integration", "ha",
|
||||
"index", "tester"))
|
||||
|
||||
raft_config_file = os.path.join(PROJECT_DIR, "tests", "integration", "ha",
|
||||
"index", "raft.json")
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--memgraph", default=memgraph_binary)
|
||||
parser.add_argument("--raft_config_file", default=raft_config_file)
|
||||
args = parser.parse_args()
|
||||
|
||||
for cluster_size in [3, 5]:
|
||||
print("\033[1;36m~~ Executing test with cluster size: %d~~\033[0m" % (cluster_size))
|
||||
HaIndexTest(
|
||||
args.memgraph, tester_binary, args.raft_config_file, cluster_size)
|
||||
print("\033[1;32m~~ The test finished successfully ~~\033[0m")
|
||||
|
||||
sys.exit(0)
|
@ -1,75 +0,0 @@
|
||||
#include <chrono>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
#include <fmt/format.h>
|
||||
#include <gflags/gflags.h>
|
||||
#include <glog/logging.h>
|
||||
|
||||
#include "communication/bolt/ha_client.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "io/network/utils.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
DEFINE_string(address, "127.0.0.1", "Server address");
|
||||
DEFINE_int32(port, 7687, "Server port");
|
||||
DEFINE_int32(cluster_size, 3, "Size of the raft cluster.");
|
||||
DEFINE_string(username, "", "Username for the database");
|
||||
DEFINE_string(password, "", "Password for the database");
|
||||
DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server.");
|
||||
DEFINE_string(step, "", "The step to execute (available: create, check)");
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
using communication::bolt::Value;
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||
google::InitGoogleLogging(argv[0]);
|
||||
|
||||
const std::string index = ":Node(id)";
|
||||
|
||||
communication::SSLInit sslInit;
|
||||
try {
|
||||
std::vector<io::network::Endpoint> endpoints(FLAGS_cluster_size);
|
||||
for (int i = 0; i < FLAGS_cluster_size; ++i)
|
||||
endpoints[i] = io::network::Endpoint(FLAGS_address, FLAGS_port + i);
|
||||
|
||||
std::chrono::milliseconds retry_delay(1000);
|
||||
communication::ClientContext context(FLAGS_use_ssl);
|
||||
communication::bolt::HAClient client(endpoints, &context, FLAGS_username,
|
||||
FLAGS_password, 25, retry_delay);
|
||||
|
||||
if (FLAGS_step == "create") {
|
||||
client.Execute(fmt::format("CREATE INDEX ON {}", index), {});
|
||||
return 0;
|
||||
} else if (FLAGS_step == "check") {
|
||||
auto result = client.Execute("SHOW INDEX INFO", {});
|
||||
auto checker = [&index](const std::vector<Value> &record) {
|
||||
if (record.size() != 1) return false;
|
||||
return record[0].ValueString() == index;
|
||||
};
|
||||
|
||||
// Check that index ":Node(id)" exists
|
||||
if (!std::any_of(result.records.begin(), result.records.end(), checker)) {
|
||||
LOG(WARNING) << "Missing index!";
|
||||
return 2;
|
||||
}
|
||||
|
||||
return 0;
|
||||
} else {
|
||||
LOG(FATAL) << "Unexpected client step!";
|
||||
}
|
||||
|
||||
} catch (const communication::bolt::ClientQueryException &e) {
|
||||
LOG(WARNING)
|
||||
<< "Transient error while executing query. (eg. mistyped query, etc.)\n"
|
||||
<< e.what();
|
||||
} catch (const communication::bolt::ClientFatalException &e) {
|
||||
LOG(WARNING) << "Couldn't connect to server\n" << e.what();
|
||||
} catch (const utils::BasicException &e) {
|
||||
LOG(WARNING) << "Error while executing query\n" << e.what();
|
||||
}
|
||||
|
||||
// The test wasn't successfull
|
||||
return 1;
|
||||
}
|
@ -1,6 +0,0 @@
|
||||
set(target_name memgraph__integration__ha_large_log_entries)
|
||||
set(tester_target_name ${target_name}__tester)
|
||||
|
||||
add_executable(${tester_target_name} tester.cpp)
|
||||
set_target_properties(${tester_target_name} PROPERTIES OUTPUT_NAME tester)
|
||||
target_link_libraries(${tester_target_name} mg-utils mg-communication)
|
@ -1,7 +0,0 @@
|
||||
{
|
||||
"election_timeout_min": 750,
|
||||
"election_timeout_max": 1000,
|
||||
"heartbeat_interval": 100,
|
||||
"replication_timeout": 10000,
|
||||
"log_size_snapshot_threshold": -1
|
||||
}
|
@ -1,92 +0,0 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import time
|
||||
import random
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
||||
PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", "..", ".."))
|
||||
|
||||
# append parent directory
|
||||
sys.path.append(os.path.join(SCRIPT_DIR, ".."))
|
||||
|
||||
from ha_test import HaTestBase
|
||||
|
||||
|
||||
class HaLargeLogEntriesTest(HaTestBase):
|
||||
def execute_step(self, step, node_count, offset_nodes=None):
|
||||
if step == "create":
|
||||
print("Executing create query")
|
||||
client = subprocess.Popen([self.tester_binary, "--step", "create",
|
||||
"--cluster-size", str(self.cluster_size),
|
||||
"--create-nodes", str(node_count),
|
||||
"--offset-nodes", str(offset_nodes)])
|
||||
|
||||
elif step == "check":
|
||||
print("Executing check query")
|
||||
client = subprocess.Popen([self.tester_binary, "--step", "check",
|
||||
"--cluster_size", str(self.cluster_size),
|
||||
"--check_nodes", str(node_count)])
|
||||
else:
|
||||
return 0
|
||||
|
||||
# Check what happened with query execution.
|
||||
try:
|
||||
code = client.wait(timeout=120)
|
||||
except subprocess.TimeoutExpired as e:
|
||||
print("HA client timed out!")
|
||||
client.kill()
|
||||
return 1
|
||||
|
||||
return code
|
||||
|
||||
|
||||
def execute(self):
|
||||
# Number of nodes to be created in a single batch
|
||||
nodes = 250000
|
||||
|
||||
self.start_cluster()
|
||||
|
||||
for i in range(self.cluster_size):
|
||||
assert self.execute_step("create", nodes, i * nodes) == 0, \
|
||||
"Error while executing create query"
|
||||
|
||||
# Kill worker.
|
||||
print("Killing worker {}".format(i + 1))
|
||||
self.kill_worker(i)
|
||||
|
||||
assert self.execute_step("check", (i + 1) * nodes) == 0, \
|
||||
"Error while executing check query"
|
||||
|
||||
# Bring worker back to life.
|
||||
print("Starting worker {}".format(i + 1))
|
||||
self.start_worker(i)
|
||||
|
||||
|
||||
def find_correct_path(path):
|
||||
return os.path.join(PROJECT_DIR, "build", path)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
memgraph_binary = find_correct_path("memgraph_ha")
|
||||
tester_binary = find_correct_path(os.path.join("tests", "integration", "ha",
|
||||
"large_log_entries", "tester"))
|
||||
|
||||
raft_config_file = os.path.join(PROJECT_DIR, "tests", "integration", "ha",
|
||||
"large_log_entries", "raft.json")
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--memgraph", default=memgraph_binary)
|
||||
parser.add_argument("--raft_config_file", default=raft_config_file)
|
||||
args = parser.parse_args()
|
||||
|
||||
for cluster_size in [3, 5]:
|
||||
print("\033[1;36m~~ Executing test with cluster size: %d~~\033[0m" % (cluster_size))
|
||||
HaLargeLogEntriesTest(
|
||||
args.memgraph, tester_binary, args.raft_config_file, cluster_size)
|
||||
print("\033[1;32m~~ The test finished successfully ~~\033[0m")
|
||||
|
||||
sys.exit(0)
|
@ -1,72 +0,0 @@
|
||||
#include <chrono>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
#include <gflags/gflags.h>
|
||||
#include <glog/logging.h>
|
||||
|
||||
#include "communication/bolt/ha_client.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "io/network/utils.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
DEFINE_string(address, "127.0.0.1", "Server address");
|
||||
DEFINE_int32(port, 7687, "Server port");
|
||||
DEFINE_int32(cluster_size, 3, "Size of the raft cluster.");
|
||||
DEFINE_int32(create_nodes, 250000, "Number of nodes to be created.");
|
||||
DEFINE_int32(offset_nodes, 0, "Initial ID of created nodes");
|
||||
DEFINE_int32(check_nodes, -1, "Number of nodes that should be in the database");
|
||||
DEFINE_string(username, "", "Username for the database");
|
||||
DEFINE_string(password, "", "Password for the database");
|
||||
DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server.");
|
||||
DEFINE_string(step, "", "The step to execute (available: create, check)");
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||
google::InitGoogleLogging(argv[0]);
|
||||
|
||||
communication::SSLInit sslInit;
|
||||
try {
|
||||
std::vector<io::network::Endpoint> endpoints(FLAGS_cluster_size);
|
||||
for (int i = 0; i < FLAGS_cluster_size; ++i)
|
||||
endpoints[i] = io::network::Endpoint(FLAGS_address, FLAGS_port + i);
|
||||
|
||||
std::chrono::milliseconds retry_delay(1000);
|
||||
communication::ClientContext context(FLAGS_use_ssl);
|
||||
communication::bolt::HAClient client(endpoints, &context, FLAGS_username,
|
||||
FLAGS_password, 60, retry_delay);
|
||||
|
||||
if (FLAGS_step == "create") {
|
||||
client.Execute("UNWIND RANGE($start, $stop) AS x CREATE(:Node {id: x})",
|
||||
{{"start", FLAGS_offset_nodes},
|
||||
{"stop", FLAGS_offset_nodes + FLAGS_create_nodes - 1}});
|
||||
return 0;
|
||||
} else if (FLAGS_step == "check") {
|
||||
auto result = client.Execute("MATCH (n) RETURN COUNT(n)", {});
|
||||
if (result.records[0][0].ValueInt() != FLAGS_check_nodes) {
|
||||
LOG(WARNING) << "Wrong number of nodes! Got " +
|
||||
std::to_string(result.records[0][0].ValueInt()) +
|
||||
", but expected " +
|
||||
std::to_string(FLAGS_check_nodes);
|
||||
return 2;
|
||||
}
|
||||
return 0;
|
||||
} else {
|
||||
LOG(FATAL) << "Unexpected client step!";
|
||||
}
|
||||
|
||||
} catch (const communication::bolt::ClientQueryException &e) {
|
||||
LOG(WARNING)
|
||||
<< "Transient error while executing query. (eg. mistyped query, etc.)\n"
|
||||
<< e.what();
|
||||
} catch (const communication::bolt::ClientFatalException &e) {
|
||||
LOG(WARNING) << "Couldn't connect to server\n" << e.what();
|
||||
} catch (const utils::BasicException &e) {
|
||||
LOG(WARNING) << "Error while executing query\n" << e.what();
|
||||
}
|
||||
|
||||
// The test wasn't successfull
|
||||
return 1;
|
||||
}
|
@ -1,6 +0,0 @@
|
||||
set(target_name memgraph__integration__ha_leader_election)
|
||||
set(tester_target_name ${target_name}__tester)
|
||||
|
||||
add_executable(${tester_target_name} tester.cpp)
|
||||
set_target_properties(${tester_target_name} PROPERTIES OUTPUT_NAME tester)
|
||||
target_link_libraries(${tester_target_name} mg-utils mg-communication)
|
@ -1,7 +0,0 @@
|
||||
{
|
||||
"election_timeout_min": 750,
|
||||
"election_timeout_max": 1000,
|
||||
"heartbeat_interval": 100,
|
||||
"replication_timeout": 10000,
|
||||
"log_size_snapshot_threshold": -1
|
||||
}
|
@ -1,97 +0,0 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
"""
|
||||
This test checks the correctness of a leader election process when its decoupled
|
||||
from log replication. In other words, in this test we do not change the state of
|
||||
the database, i.e., the Raft log remains empty.
|
||||
|
||||
The test proceeds as follows for clusters of size 3 and 5:
|
||||
1) Start a random subset of workers in the cluster
|
||||
2) Check if the leader has been elected
|
||||
3) Kill all living workers
|
||||
4) GOTO 1) and repeat 10 times
|
||||
|
||||
Naturally, throughout the process we keep track the number of alive workers and,
|
||||
based on that number (majority or not), we decide whether the leader should have
|
||||
been elected.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import time
|
||||
import random
|
||||
import subprocess
|
||||
import sys
|
||||
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
||||
PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", "..", ".."))
|
||||
|
||||
# append parent directory
|
||||
sys.path.append(os.path.join(SCRIPT_DIR, ".."))
|
||||
|
||||
from ha_test import HaTestBase
|
||||
|
||||
def random_subset(l):
|
||||
return random.sample(l, random.randint(0, len(l) - 1))
|
||||
|
||||
|
||||
class HaLeaderElectionTest(HaTestBase):
|
||||
def leader_check(self, has_majority):
|
||||
client = subprocess.Popen([self.tester_binary,
|
||||
"--has-majority", str(int(has_majority)),
|
||||
"--cluster-size", str(int(cluster_size))])
|
||||
|
||||
# Check what happened with query execution.
|
||||
try:
|
||||
code = client.wait(timeout=60)
|
||||
except subprocess.TimeoutExpired as e:
|
||||
print("Error! client timeout expired.")
|
||||
client.kill()
|
||||
return 1
|
||||
|
||||
return code
|
||||
|
||||
|
||||
def execute(self):
|
||||
for i in range(10):
|
||||
# Awake random subset of dead nodes
|
||||
alive = random_subset(range(0, self.cluster_size))
|
||||
for wid in alive:
|
||||
print("Starting worker {}".format(wid + 1))
|
||||
self.start_worker(wid)
|
||||
|
||||
# Check if leader is elected
|
||||
assert self.leader_check(2 * len(alive) > self.cluster_size) == 0, \
|
||||
"Error while trying to find leader"
|
||||
|
||||
# Kill living nodes
|
||||
for wid in alive:
|
||||
print("Killing worker {}".format(wid + 1))
|
||||
self.kill_worker(wid)
|
||||
|
||||
self.destroy_cluster()
|
||||
|
||||
|
||||
def find_correct_path(path):
|
||||
return os.path.join(PROJECT_DIR, "build", path)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
memgraph_binary = find_correct_path("memgraph_ha")
|
||||
tester_binary = find_correct_path(os.path.join("tests", "integration", "ha",
|
||||
"leader_election", "tester"))
|
||||
|
||||
raft_config_file = os.path.join(PROJECT_DIR, "tests", "integration", "ha",
|
||||
"leader_election", "raft.json")
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--memgraph", default=memgraph_binary)
|
||||
parser.add_argument("--raft_config_file", default=raft_config_file)
|
||||
args = parser.parse_args()
|
||||
|
||||
for cluster_size in [3, 5]:
|
||||
print("\033[1;36m~~ Executing test with cluster size: %d~~\033[0m" % (cluster_size))
|
||||
HaLeaderElectionTest(
|
||||
args.memgraph, tester_binary, args.raft_config_file, cluster_size)
|
||||
print("\033[1;32m~~ The test finished successfully ~~\033[0m")
|
||||
|
||||
sys.exit(0)
|
@ -1,70 +0,0 @@
|
||||
#include <chrono>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
#include <gflags/gflags.h>
|
||||
#include <glog/logging.h>
|
||||
|
||||
#include "communication/bolt/ha_client.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "io/network/utils.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
DEFINE_string(address, "127.0.0.1", "Server address");
|
||||
DEFINE_int32(port, 7687, "Server port");
|
||||
DEFINE_int32(cluster_size, 3, "Size of the raft cluster.");
|
||||
DEFINE_int32(has_majority, 0, "Should we be able to elect the leader.");
|
||||
DEFINE_string(username, "", "Username for the database");
|
||||
DEFINE_string(password, "", "Password for the database");
|
||||
DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server.");
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||
google::InitGoogleLogging(argv[0]);
|
||||
|
||||
communication::SSLInit sslInit;
|
||||
try {
|
||||
std::vector<io::network::Endpoint> endpoints(FLAGS_cluster_size);
|
||||
for (int i = 0; i < FLAGS_cluster_size; ++i)
|
||||
endpoints[i] = io::network::Endpoint(FLAGS_address, FLAGS_port + i);
|
||||
|
||||
std::chrono::milliseconds retry_delay(1000);
|
||||
communication::ClientContext context(FLAGS_use_ssl);
|
||||
communication::bolt::HAClient client(endpoints, &context, FLAGS_username,
|
||||
FLAGS_password, 15, retry_delay);
|
||||
|
||||
auto leader = client.GetLeaderId();
|
||||
if (!FLAGS_has_majority) {
|
||||
LOG(WARNING)
|
||||
<< "The majority of cluster is dead but we have elected server "
|
||||
<< std::to_string(leader) << " as a leader";
|
||||
return 1;
|
||||
} else {
|
||||
LOG(INFO) << "Server " << std::to_string(leader)
|
||||
<< " was successfully elected as a leader";
|
||||
return 0;
|
||||
}
|
||||
|
||||
} catch (const communication::bolt::ClientFatalException &e) {
|
||||
if (FLAGS_has_majority) {
|
||||
LOG(WARNING)
|
||||
<< "The majority of cluster is alive but the leader was not elected.";
|
||||
return 1;
|
||||
} else {
|
||||
LOG(INFO)
|
||||
<< "The has_majority of cluster is dead and no leader was elected.";
|
||||
return 0;
|
||||
}
|
||||
} catch (const communication::bolt::ClientQueryException &e) {
|
||||
LOG(WARNING)
|
||||
<< "Transient error while executing query. (eg. mistyped query, etc.)\n"
|
||||
<< e.what();
|
||||
} catch (const utils::BasicException &e) {
|
||||
LOG(WARNING) << "Error while executing query\n" << e.what();
|
||||
}
|
||||
|
||||
// The test wasn't successfull
|
||||
return 1;
|
||||
}
|
@ -1,7 +0,0 @@
|
||||
{
|
||||
"election_timeout_min": 200,
|
||||
"election_timeout_max": 500,
|
||||
"heartbeat_interval": 100,
|
||||
"replication_timeout": 10000,
|
||||
"log_size_snapshot_threshold": 100
|
||||
}
|
@ -1,104 +0,0 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
import argparse
|
||||
import hashlib
|
||||
import os
|
||||
import time
|
||||
import subprocess
|
||||
import sys
|
||||
import random
|
||||
|
||||
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
||||
PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", "..", ".."))
|
||||
|
||||
# append parent directory
|
||||
sys.path.append(os.path.join(SCRIPT_DIR, ".."))
|
||||
|
||||
from ha_test import HaTestBase
|
||||
|
||||
|
||||
def shasum(filename):
|
||||
with open(filename, 'rb') as f:
|
||||
return hashlib.sha1(f.read()).hexdigest()
|
||||
|
||||
raise Exception("Couldn't get shasum for file {}".format(filename))
|
||||
|
||||
|
||||
class HaLogCompactionTest(HaTestBase):
|
||||
def execute_step(self, query):
|
||||
client = subprocess.Popen(
|
||||
[self.tester_binary, "--cluster_size", str(self.cluster_size)],
|
||||
stdin=subprocess.PIPE, stdout=subprocess.DEVNULL)
|
||||
|
||||
try:
|
||||
client.communicate(input=bytes(query, "UTF-8"), timeout=30)
|
||||
except subprocess.TimeoutExpired as e:
|
||||
client.kill()
|
||||
client.communicate()
|
||||
return 1
|
||||
|
||||
return client.returncode
|
||||
|
||||
|
||||
def get_snapshot_path(self, worker_id):
|
||||
dur = os.path.join(self.get_durability_directory(worker_id), "snapshots")
|
||||
snapshots = os.listdir(dur)
|
||||
|
||||
assert len(snapshots) == 1, \
|
||||
"More than one snapshot on worker {}!".format(worker_id + 1)
|
||||
return os.path.join(dur, snapshots[0])
|
||||
|
||||
|
||||
def execute(self):
|
||||
# custom cluster startup
|
||||
for worker_id in range(1, self.cluster_size):
|
||||
self.start_worker(worker_id)
|
||||
|
||||
time.sleep(5)
|
||||
assert self.execute_step("CREATE (:Node)\n" * 128) == 0, \
|
||||
"Error while executing create query"
|
||||
|
||||
self.start_worker(0)
|
||||
|
||||
# allow some time for the snapshot transfer
|
||||
time.sleep(5)
|
||||
snapshot_shasum = shasum(self.get_snapshot_path(0))
|
||||
|
||||
success = False
|
||||
for worker_id in range(1, self.cluster_size):
|
||||
if shasum(self.get_snapshot_path(worker_id)) == snapshot_shasum:
|
||||
success = True
|
||||
break
|
||||
|
||||
# Check if the cluster is alive
|
||||
for worker_id in range(self.cluster_size):
|
||||
assert self.is_worker_alive(worker_id), \
|
||||
"Worker {} died prematurely".format(worker_id + 1)
|
||||
|
||||
assert success, "Snapshot didn't transfer successfully"
|
||||
|
||||
|
||||
def find_correct_path(path):
|
||||
return os.path.join(PROJECT_DIR, "build", path)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
memgraph_binary = find_correct_path("memgraph_ha")
|
||||
tester_binary = find_correct_path(os.path.join("tests", "manual",
|
||||
"ha_client"))
|
||||
|
||||
raft_config_file = os.path.join(PROJECT_DIR, "tests", "integration", "ha",
|
||||
"log_compaction", "raft.json")
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--memgraph", default=memgraph_binary)
|
||||
parser.add_argument("--raft_config_file", default=raft_config_file)
|
||||
args = parser.parse_args()
|
||||
|
||||
for cluster_size in [3, 5]:
|
||||
print("\033[1;36m~~ Executing test with cluster size: %d ~~\033[0m" % (cluster_size))
|
||||
HaLogCompactionTest(
|
||||
args.memgraph, tester_binary, args.raft_config_file, cluster_size)
|
||||
print("\033[1;32m~~ The test finished successfully ~~\033[0m")
|
||||
|
||||
sys.exit(0)
|
@ -1,6 +0,0 @@
|
||||
set(target_name memgraph__integration__ha_term_updates)
|
||||
set(tester_target_name ${target_name}__tester)
|
||||
|
||||
add_executable(${tester_target_name} tester.cpp)
|
||||
set_target_properties(${tester_target_name} PROPERTIES OUTPUT_NAME tester)
|
||||
target_link_libraries(${tester_target_name} mg-utils mg-communication)
|
@ -1,7 +0,0 @@
|
||||
{
|
||||
"election_timeout_min": 750,
|
||||
"election_timeout_max": 1000,
|
||||
"heartbeat_interval": 100,
|
||||
"replication_timeout": 10000,
|
||||
"log_size_snapshot_threshold": -1
|
||||
}
|
@ -1,131 +0,0 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import time
|
||||
import random
|
||||
import subprocess
|
||||
import shutil
|
||||
import sys
|
||||
|
||||
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
||||
PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", "..", ".."))
|
||||
|
||||
# append parent directory
|
||||
sys.path.append(os.path.join(SCRIPT_DIR, ".."))
|
||||
|
||||
from ha_test import HaTestBase
|
||||
|
||||
|
||||
class HaTermUpdatesTest(HaTestBase):
|
||||
def execute_step(self, step, expected_results=None):
|
||||
if step == "create":
|
||||
print("Executing create query")
|
||||
client = subprocess.Popen([self.tester_binary, "--step", "create",
|
||||
"--cluster_size", str(self.cluster_size)])
|
||||
|
||||
elif step == "count":
|
||||
print("Executing count query")
|
||||
client = subprocess.Popen([self.tester_binary, "--step", "count",
|
||||
"--cluster_size", str(self.cluster_size), "--expected_results",
|
||||
str(expected_results)])
|
||||
else:
|
||||
raise ValueError("Invalid step argument: " + step)
|
||||
|
||||
# Check what happened with query execution.
|
||||
try:
|
||||
code = client.wait(timeout=30)
|
||||
except subprocess.TimeoutExpired as e:
|
||||
print("Client timed out!")
|
||||
client.kill()
|
||||
return False
|
||||
|
||||
return code == 0
|
||||
|
||||
def find_leader(self):
|
||||
client = subprocess.run([self.tester_binary,
|
||||
"--step", "find_leader",
|
||||
"--cluster_size", str(self.cluster_size)],
|
||||
stdout=subprocess.PIPE, check=True)
|
||||
return int(client.stdout.decode('utf-8')) - 1
|
||||
|
||||
def execute(self):
|
||||
self.start_cluster() # start a whole cluster from scratch
|
||||
leader_id = self.find_leader()
|
||||
follower_ids = list(set(range(self.cluster_size)) - {leader_id})
|
||||
|
||||
# Kill all followers.
|
||||
for i in follower_ids:
|
||||
print("Killing worker {}".format(i))
|
||||
self.kill_worker(i)
|
||||
|
||||
# Try to execute a 'CREATE (n)' query on the leader.
|
||||
# The query hangs because the leader doesn't have consensus.
|
||||
assert not self.execute_step("create"), \
|
||||
"Error - a non-majorty cluster managed to execute a query"
|
||||
|
||||
# Start a follower to create consensus so that the create succeeds.
|
||||
print("Starting worker {}".format(follower_ids[0]))
|
||||
self.start_worker(follower_ids[0])
|
||||
self.find_leader() # wait for leader re-election
|
||||
assert self.execute_step("count", expected_results=1), \
|
||||
"Error while executing count query"
|
||||
|
||||
# Kill the leader.
|
||||
print("Killing leader (machine {})".format(leader_id))
|
||||
self.kill_worker(leader_id)
|
||||
time.sleep(1)
|
||||
|
||||
# Start the second follower to create a consensus with the first
|
||||
# follower so that the first follower may become the new leader.
|
||||
print("Starting worker {}".format(follower_ids[1]))
|
||||
self.start_worker(follower_ids[1])
|
||||
self.find_leader() # wait for leader re-election
|
||||
|
||||
# Verify that the data is there -> connect to the new leader and execute
|
||||
# "MATCH (n) RETURN n" -> the data should be there.
|
||||
assert self.execute_step("count", expected_results=1), \
|
||||
"Error while executing count query"
|
||||
|
||||
# Try to assemble the whole cluster again by returning the old leader
|
||||
# to the cluster as a fresh machine.
|
||||
# (start the machine with its durability directory previously removed)
|
||||
shutil.rmtree(self.get_durability_directory(leader_id))
|
||||
self.start_worker(leader_id)
|
||||
self.find_leader() # wait for leader re-election
|
||||
assert self.execute_step("count", expected_results=1), \
|
||||
"Error while executing count query"
|
||||
|
||||
|
||||
def find_correct_path(path):
|
||||
return os.path.join(PROJECT_DIR, "build", path)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
memgraph_binary = find_correct_path("memgraph_ha")
|
||||
tester_binary = find_correct_path(os.path.join("tests", "integration", "ha",
|
||||
"term_updates", "tester"))
|
||||
|
||||
raft_config_file = os.path.join(PROJECT_DIR, "tests", "integration", "ha",
|
||||
"term_updates", "raft.json")
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--memgraph", default=memgraph_binary)
|
||||
parser.add_argument("--raft_config_file", default=raft_config_file)
|
||||
parser.add_argument("--username", default="")
|
||||
parser.add_argument("--password", default="")
|
||||
parser.add_argument("--encrypted", type=bool, default=False)
|
||||
parser.add_argument("--address", type=str,
|
||||
default="bolt://127.0.0.1")
|
||||
parser.add_argument("--port", type=int,
|
||||
default=7687)
|
||||
args = parser.parse_args()
|
||||
|
||||
cluster_size = 3 # test only works for 3 nodes
|
||||
print("\033[1;36m~~ Executing test with cluster size: %d~~\033[0m"
|
||||
% (cluster_size))
|
||||
HaTermUpdatesTest(
|
||||
args.memgraph, tester_binary, args.raft_config_file, cluster_size)
|
||||
print("\033[1;32m~~ The test finished successfully ~~\033[0m")
|
||||
|
||||
sys.exit(0)
|
@ -1,77 +0,0 @@
|
||||
#include <chrono>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
#include <gflags/gflags.h>
|
||||
#include <glog/logging.h>
|
||||
|
||||
#include "communication/bolt/ha_client.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "io/network/utils.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
DEFINE_string(address, "127.0.0.1", "Server address");
|
||||
DEFINE_int32(port, 7687, "Server port");
|
||||
DEFINE_int32(cluster_size, 3, "Size of the raft cluster.");
|
||||
DEFINE_int32(expected_results, -1, "Number of expected nodes.");
|
||||
DEFINE_int32(num_retries, 20, "Number of (leader) execution retries.");
|
||||
DEFINE_string(username, "", "Username for the database");
|
||||
DEFINE_string(password, "", "Password for the database");
|
||||
DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server.");
|
||||
|
||||
DEFINE_string(step, "create", "The step to execute (available: create, count)");
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||
google::InitGoogleLogging(argv[0]);
|
||||
|
||||
communication::SSLInit sslInit;
|
||||
|
||||
try {
|
||||
std::vector<io::network::Endpoint> endpoints;
|
||||
for (int i = 0; i < FLAGS_cluster_size; ++i) {
|
||||
uint16_t port = FLAGS_port + i;
|
||||
io::network::Endpoint endpoint{FLAGS_address, port};
|
||||
endpoints.push_back(endpoint);
|
||||
}
|
||||
|
||||
std::chrono::milliseconds retry_delay(1000);
|
||||
communication::ClientContext context(FLAGS_use_ssl);
|
||||
communication::bolt::HAClient client(endpoints, &context, FLAGS_username,
|
||||
FLAGS_password, FLAGS_num_retries,
|
||||
retry_delay);
|
||||
|
||||
if (FLAGS_step == "create") {
|
||||
client.Execute("create (:Node)", {});
|
||||
return 0;
|
||||
|
||||
} else if (FLAGS_step == "count") {
|
||||
auto result = client.Execute("match (n) return n", {});
|
||||
|
||||
if (result.records.size() != FLAGS_expected_results) {
|
||||
LOG(WARNING) << "Unexpected number of nodes: "
|
||||
<< "expected " << FLAGS_expected_results << ", got "
|
||||
<< result.records.size();
|
||||
return 2;
|
||||
}
|
||||
return 0;
|
||||
|
||||
} else if (FLAGS_step == "find_leader") {
|
||||
std::cout << client.GetLeaderId();
|
||||
return 0;
|
||||
|
||||
} else {
|
||||
LOG(FATAL) << "Unexpected client step!";
|
||||
}
|
||||
} catch (const communication::bolt::ClientQueryException &) {
|
||||
LOG(WARNING) << "There was some transient error during query execution.";
|
||||
} catch (const communication::bolt::ClientFatalException &) {
|
||||
LOG(WARNING) << "Failed to communicate with the leader.";
|
||||
} catch (const utils::BasicException &e) {
|
||||
LOG(WARNING) << "Error while executing query.";
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
41
tests/setup.sh
Executable file
41
tests/setup.sh
Executable file
@ -0,0 +1,41 @@
|
||||
#!/bin/bash
|
||||
|
||||
# shellcheck disable=1091
|
||||
set -Eeuo pipefail
|
||||
|
||||
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
|
||||
PIP_DEPS=(
|
||||
"behave==1.2.6"
|
||||
"ldap3==2.6"
|
||||
"neo4j-driver==4.1.1"
|
||||
"parse==1.18.0"
|
||||
"parse-type==0.5.2"
|
||||
"pyyaml==5.3.1"
|
||||
"six==1.15.0"
|
||||
)
|
||||
cd "$DIR"
|
||||
|
||||
# Remove old virtualenv.
|
||||
if [ -d ve3 ]; then
|
||||
rm -rf ve3
|
||||
fi
|
||||
|
||||
# Create new virtualenv.
|
||||
virtualenv -p python3 ve3
|
||||
set +u
|
||||
source "ve3/bin/activate"
|
||||
set -u
|
||||
|
||||
for pkg in "${PIP_DEPS[@]}"; do
|
||||
pip --timeout 1000 install "$pkg"
|
||||
done
|
||||
|
||||
# Install mgclient from source becasue of full flexibility.
|
||||
pushd "$DIR/../libs/pymgclient" > /dev/null
|
||||
export MGCLIENT_INCLUDE_DIR="$DIR/../libs/mgclient/include"
|
||||
export MGCLIENT_LIB_DIR="$DIR/../libs/mgclient/lib"
|
||||
python3 setup.py build
|
||||
python3 setup.py install
|
||||
popd > /dev/null
|
||||
|
||||
deactivate
|
@ -41,7 +41,7 @@ int main(int argc, char **argv) {
|
||||
mg_session_params_set_username(params, FLAGS_username.c_str());
|
||||
mg_session_params_set_password(params, FLAGS_password.c_str());
|
||||
}
|
||||
mg_session_params_set_client_name(params, bolt_client_version.c_str());
|
||||
mg_session_params_set_user_agent(params, bolt_client_version.c_str());
|
||||
mg_session_params_set_sslmode(
|
||||
params, FLAGS_use_ssl ? MG_SSLMODE_REQUIRE : MG_SSLMODE_DISABLE);
|
||||
|
||||
@ -56,7 +56,14 @@ int main(int argc, char **argv) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (mg_session_run(session, "DUMP DATABASE", nullptr, nullptr) < 0) {
|
||||
if (mg_session_run(session, "DUMP DATABASE", nullptr, nullptr, nullptr,
|
||||
nullptr) < 0) {
|
||||
std::cerr << "Execution failed: " << mg_session_error(session) << std::endl;
|
||||
mg_session_destroy(session);
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (mg_session_pull(session, nullptr) < 0) {
|
||||
std::cerr << "Execution failed: " << mg_session_error(session) << std::endl;
|
||||
mg_session_destroy(session);
|
||||
return 1;
|
||||
@ -64,7 +71,7 @@ int main(int argc, char **argv) {
|
||||
|
||||
// Fetch results
|
||||
mg_result *result;
|
||||
while ((status = mg_session_pull(session, &result)) == 1) {
|
||||
while ((status = mg_session_fetch(session, &result)) == 1) {
|
||||
const mg_list *row = mg_result_row(result);
|
||||
CHECK(mg_list_size(row) == 1)
|
||||
<< "Error: dump client received data in unexpected format";
|
||||
|
Loading…
Reference in New Issue
Block a user