Add Raft term updates integration test
Reviewers: ipaljak, msantl, mferencevic Reviewed By: ipaljak, msantl, mferencevic Subscribers: mferencevic, pullbot Differential Revision: https://phabricator.memgraph.io/D2002
This commit is contained in:
parent
88778fc70f
commit
81a7a7b600
@ -22,6 +22,9 @@ add_subdirectory(ha/basic)
|
||||
# distributed ha/index binaries
|
||||
add_subdirectory(ha/index)
|
||||
|
||||
# distributed ha/term_updates binaries
|
||||
add_subdirectory(ha/term_updates)
|
||||
|
||||
# audit test binaries
|
||||
add_subdirectory(audit)
|
||||
|
||||
|
@ -101,3 +101,13 @@
|
||||
- ../ha_test.py # raft test base module
|
||||
- ../../../../build_debug/memgraph_ha # memgraph ha binary
|
||||
- ../../../../build_debug/tests/manual/ha_client # tester binary
|
||||
|
||||
- name: integration__ha_term_updates
|
||||
cd: ha/term_updates
|
||||
commands: ./runner.py
|
||||
infiles:
|
||||
- runner.py # runner script
|
||||
- raft.json # raft configuration
|
||||
- ../ha_test.py # raft test base module
|
||||
- ../../../../build_debug/memgraph_ha # memgraph ha binary
|
||||
- ../../../../build_debug/tests/integration/ha/term_updates/tester # tester binary
|
||||
|
6
tests/integration/ha/term_updates/CMakeLists.txt
Normal file
6
tests/integration/ha/term_updates/CMakeLists.txt
Normal file
@ -0,0 +1,6 @@
|
||||
set(target_name memgraph__integration__ha_term_updates)
|
||||
set(tester_target_name ${target_name}__tester)
|
||||
|
||||
add_executable(${tester_target_name} tester.cpp)
|
||||
set_target_properties(${tester_target_name} PROPERTIES OUTPUT_NAME tester)
|
||||
target_link_libraries(${tester_target_name} mg-utils mg-communication)
|
7
tests/integration/ha/term_updates/raft.json
Normal file
7
tests/integration/ha/term_updates/raft.json
Normal file
@ -0,0 +1,7 @@
|
||||
{
|
||||
"election_timeout_min": 750,
|
||||
"election_timeout_max": 1000,
|
||||
"heartbeat_interval": 100,
|
||||
"replication_timeout": 10000,
|
||||
"log_size_snapshot_threshold": -1
|
||||
}
|
134
tests/integration/ha/term_updates/runner.py
Executable file
134
tests/integration/ha/term_updates/runner.py
Executable file
@ -0,0 +1,134 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import time
|
||||
import random
|
||||
import subprocess
|
||||
import shutil
|
||||
import sys
|
||||
|
||||
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
||||
PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", "..", ".."))
|
||||
|
||||
# append parent directory
|
||||
sys.path.append(os.path.join(SCRIPT_DIR, ".."))
|
||||
|
||||
from ha_test import HaTestBase
|
||||
|
||||
|
||||
class HaTermUpdatesTest(HaTestBase):
|
||||
def execute_step(self, step, expected_results=None):
|
||||
if step == "create":
|
||||
print("Executing create query")
|
||||
client = subprocess.Popen([self.tester_binary, "--step", "create",
|
||||
"--cluster_size", str(self.cluster_size)])
|
||||
|
||||
elif step == "count":
|
||||
print("Executing count query")
|
||||
client = subprocess.Popen([self.tester_binary, "--step", "count",
|
||||
"--cluster_size", str(self.cluster_size), "--expected_results",
|
||||
str(expected_results)])
|
||||
else:
|
||||
raise ValueError("Invalid step argument: " + step)
|
||||
|
||||
# Check what happened with query execution.
|
||||
try:
|
||||
code = client.wait(timeout=30)
|
||||
except subprocess.TimeoutExpired as e:
|
||||
print("Client timed out!")
|
||||
client.kill()
|
||||
return False
|
||||
|
||||
return code == 0
|
||||
|
||||
def find_leader(self):
|
||||
client = subprocess.run([self.tester_binary,
|
||||
"--step", "find_leader",
|
||||
"--cluster_size", str(self.cluster_size)],
|
||||
stdout=subprocess.PIPE, check=True)
|
||||
return int(client.stdout.decode('utf-8')) - 1
|
||||
|
||||
def execute(self):
|
||||
self.start_cluster() # start a whole cluster from scratch
|
||||
leader_id = self.find_leader()
|
||||
follower_ids = list(set(range(self.cluster_size)) - {leader_id})
|
||||
|
||||
# Kill all followers.
|
||||
for i in follower_ids:
|
||||
print("Killing worker {}".format(i))
|
||||
self.kill_worker(i)
|
||||
|
||||
# Try to execute a 'CREATE (n)' query on the leader.
|
||||
# The query hangs because the leader doesn't have consensus.
|
||||
assert not self.execute_step("create"), \
|
||||
"Error - a non-majorty cluster managed to execute a query"
|
||||
|
||||
# Start a follower to create consensus so that the create succeeds.
|
||||
print("Starting worker {}".format(follower_ids[0]))
|
||||
self.start_worker(follower_ids[0])
|
||||
self.find_leader() # wait for leader re-election
|
||||
assert self.execute_step("count", expected_results=1), \
|
||||
"Error while executing count query"
|
||||
|
||||
# Kill the leader.
|
||||
print("Killing leader (machine {})".format(leader_id))
|
||||
self.kill_worker(leader_id)
|
||||
time.sleep(1)
|
||||
|
||||
# Start the second follower to create a consensus with the first
|
||||
# follower so that the first follower may become the new leader.
|
||||
print("Starting worker {}".format(follower_ids[1]))
|
||||
self.start_worker(follower_ids[1])
|
||||
self.find_leader() # wait for leader re-election
|
||||
|
||||
# Verify that the data is there -> connect to the new leader and execute
|
||||
# "MATCH (n) RETURN n" -> the data should be there.
|
||||
assert self.execute_step("count", expected_results=1), \
|
||||
"Error while executing count query"
|
||||
|
||||
# Try to assemble the whole cluster again by returning the old leader
|
||||
# to the cluster as a fresh machine.
|
||||
# (start the machine with its durability directory previously removed)
|
||||
shutil.rmtree(self.get_durability_directory(leader_id))
|
||||
self.start_worker(leader_id)
|
||||
self.find_leader() # wait for leader re-election
|
||||
assert self.execute_step("count", expected_results=1), \
|
||||
"Error while executing count query"
|
||||
|
||||
|
||||
def find_correct_path(path):
|
||||
f = os.path.join(PROJECT_DIR, "build", path)
|
||||
if not os.path.exists(f):
|
||||
f = os.path.join(PROJECT_DIR, "build_debug", path)
|
||||
return f
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
memgraph_binary = find_correct_path("memgraph_ha")
|
||||
tester_binary = find_correct_path(os.path.join("tests", "integration", "ha",
|
||||
"term_updates", "tester"))
|
||||
|
||||
raft_config_file = os.path.join(PROJECT_DIR, "tests", "integration", "ha",
|
||||
"term_updates", "raft.json")
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--memgraph", default=memgraph_binary)
|
||||
parser.add_argument("--raft_config_file", default=raft_config_file)
|
||||
parser.add_argument("--username", default="")
|
||||
parser.add_argument("--password", default="")
|
||||
parser.add_argument("--encrypted", type=bool, default=False)
|
||||
parser.add_argument("--address", type=str,
|
||||
default="bolt://127.0.0.1")
|
||||
parser.add_argument("--port", type=int,
|
||||
default=7687)
|
||||
args = parser.parse_args()
|
||||
|
||||
cluster_size = 3 # test only works for 3 nodes
|
||||
print("\033[1;36m~~ Executing test with cluster size: %d~~\033[0m"
|
||||
% (cluster_size))
|
||||
HaTermUpdatesTest(
|
||||
args.memgraph, tester_binary, args.raft_config_file, cluster_size)
|
||||
print("\033[1;32m~~ The test finished successfully ~~\033[0m")
|
||||
|
||||
sys.exit(0)
|
77
tests/integration/ha/term_updates/tester.cpp
Normal file
77
tests/integration/ha/term_updates/tester.cpp
Normal file
@ -0,0 +1,77 @@
|
||||
#include <chrono>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
#include <gflags/gflags.h>
|
||||
#include <glog/logging.h>
|
||||
|
||||
#include "communication/bolt/ha_client.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "io/network/utils.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
DEFINE_string(address, "127.0.0.1", "Server address");
|
||||
DEFINE_int32(port, 7687, "Server port");
|
||||
DEFINE_int32(cluster_size, 3, "Size of the raft cluster.");
|
||||
DEFINE_int32(expected_results, -1, "Number of expected nodes.");
|
||||
DEFINE_int32(num_retries, 20, "Number of (leader) execution retries.");
|
||||
DEFINE_string(username, "", "Username for the database");
|
||||
DEFINE_string(password, "", "Password for the database");
|
||||
DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server.");
|
||||
|
||||
DEFINE_string(step, "create", "The step to execute (available: create, count)");
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||
google::InitGoogleLogging(argv[0]);
|
||||
|
||||
communication::Init();
|
||||
|
||||
try {
|
||||
std::vector<io::network::Endpoint> endpoints;
|
||||
for (int i = 0; i < FLAGS_cluster_size; ++i) {
|
||||
uint16_t port = FLAGS_port + i;
|
||||
io::network::Endpoint endpoint{FLAGS_address, port};
|
||||
endpoints.push_back(endpoint);
|
||||
}
|
||||
|
||||
std::chrono::milliseconds retry_delay(1000);
|
||||
communication::ClientContext context(FLAGS_use_ssl);
|
||||
communication::bolt::HAClient client(endpoints, &context, FLAGS_username,
|
||||
FLAGS_password, FLAGS_num_retries,
|
||||
retry_delay);
|
||||
|
||||
if (FLAGS_step == "create") {
|
||||
client.Execute("create (:Node)", {});
|
||||
return 0;
|
||||
|
||||
} else if (FLAGS_step == "count") {
|
||||
auto result = client.Execute("match (n) return n", {});
|
||||
|
||||
if (result.records.size() != FLAGS_expected_results) {
|
||||
LOG(WARNING) << "Unexpected number of nodes: "
|
||||
<< "expected " << FLAGS_expected_results
|
||||
<< ", got " << result.records.size();
|
||||
return 2;
|
||||
}
|
||||
return 0;
|
||||
|
||||
} else if (FLAGS_step == "find_leader") {
|
||||
std::cout << client.GetLeaderId();
|
||||
return 0;
|
||||
|
||||
} else {
|
||||
LOG(FATAL) << "Unexpected client step!";
|
||||
}
|
||||
} catch (const communication::bolt::ClientQueryException &) {
|
||||
LOG(WARNING) << "There was some transient error during query execution.";
|
||||
} catch (const communication::bolt::ClientFatalException &) {
|
||||
LOG(WARNING) << "Failed to communicate with the leader.";
|
||||
} catch (const utils::BasicException &e) {
|
||||
LOG(WARNING) << "Error while executing query.";
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
Loading…
Reference in New Issue
Block a user