diff --git a/tests/integration/CMakeLists.txt b/tests/integration/CMakeLists.txt index e786c5806..47f89ad01 100644 --- a/tests/integration/CMakeLists.txt +++ b/tests/integration/CMakeLists.txt @@ -22,6 +22,9 @@ add_subdirectory(ha/basic) # distributed ha/index binaries add_subdirectory(ha/index) +# distributed ha/term_updates binaries +add_subdirectory(ha/term_updates) + # audit test binaries add_subdirectory(audit) diff --git a/tests/integration/apollo_runs.yaml b/tests/integration/apollo_runs.yaml index 6a8e47aea..60dc7f887 100644 --- a/tests/integration/apollo_runs.yaml +++ b/tests/integration/apollo_runs.yaml @@ -101,3 +101,13 @@ - ../ha_test.py # raft test base module - ../../../../build_debug/memgraph_ha # memgraph ha binary - ../../../../build_debug/tests/manual/ha_client # tester binary + +- name: integration__ha_term_updates + cd: ha/term_updates + commands: ./runner.py + infiles: + - runner.py # runner script + - raft.json # raft configuration + - ../ha_test.py # raft test base module + - ../../../../build_debug/memgraph_ha # memgraph ha binary + - ../../../../build_debug/tests/integration/ha/term_updates/tester # tester binary diff --git a/tests/integration/ha/term_updates/CMakeLists.txt b/tests/integration/ha/term_updates/CMakeLists.txt new file mode 100644 index 000000000..e43402228 --- /dev/null +++ b/tests/integration/ha/term_updates/CMakeLists.txt @@ -0,0 +1,6 @@ +set(target_name memgraph__integration__ha_term_updates) +set(tester_target_name ${target_name}__tester) + +add_executable(${tester_target_name} tester.cpp) +set_target_properties(${tester_target_name} PROPERTIES OUTPUT_NAME tester) +target_link_libraries(${tester_target_name} mg-utils mg-communication) diff --git a/tests/integration/ha/term_updates/raft.json b/tests/integration/ha/term_updates/raft.json new file mode 100644 index 000000000..ede2ddc62 --- /dev/null +++ b/tests/integration/ha/term_updates/raft.json @@ -0,0 +1,7 @@ +{ + "election_timeout_min": 750, + "election_timeout_max": 1000, + "heartbeat_interval": 100, + "replication_timeout": 10000, + "log_size_snapshot_threshold": -1 +} diff --git a/tests/integration/ha/term_updates/runner.py b/tests/integration/ha/term_updates/runner.py new file mode 100755 index 000000000..d1c48e991 --- /dev/null +++ b/tests/integration/ha/term_updates/runner.py @@ -0,0 +1,134 @@ +#!/usr/bin/python3 + +import argparse +import os +import time +import random +import subprocess +import shutil +import sys + +SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) +PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", "..", "..")) + +# append parent directory +sys.path.append(os.path.join(SCRIPT_DIR, "..")) + +from ha_test import HaTestBase + + +class HaTermUpdatesTest(HaTestBase): + def execute_step(self, step, expected_results=None): + if step == "create": + print("Executing create query") + client = subprocess.Popen([self.tester_binary, "--step", "create", + "--cluster_size", str(self.cluster_size)]) + + elif step == "count": + print("Executing count query") + client = subprocess.Popen([self.tester_binary, "--step", "count", + "--cluster_size", str(self.cluster_size), "--expected_results", + str(expected_results)]) + else: + raise ValueError("Invalid step argument: " + step) + + # Check what happened with query execution. + try: + code = client.wait(timeout=30) + except subprocess.TimeoutExpired as e: + print("Client timed out!") + client.kill() + return False + + return code == 0 + + def find_leader(self): + client = subprocess.run([self.tester_binary, + "--step", "find_leader", + "--cluster_size", str(self.cluster_size)], + stdout=subprocess.PIPE, check=True) + return int(client.stdout.decode('utf-8')) - 1 + + def execute(self): + self.start_cluster() # start a whole cluster from scratch + leader_id = self.find_leader() + follower_ids = list(set(range(self.cluster_size)) - {leader_id}) + + # Kill all followers. + for i in follower_ids: + print("Killing worker {}".format(i)) + self.kill_worker(i) + + # Try to execute a 'CREATE (n)' query on the leader. + # The query hangs because the leader doesn't have consensus. + assert not self.execute_step("create"), \ + "Error - a non-majorty cluster managed to execute a query" + + # Start a follower to create consensus so that the create succeeds. + print("Starting worker {}".format(follower_ids[0])) + self.start_worker(follower_ids[0]) + self.find_leader() # wait for leader re-election + assert self.execute_step("count", expected_results=1), \ + "Error while executing count query" + + # Kill the leader. + print("Killing leader (machine {})".format(leader_id)) + self.kill_worker(leader_id) + time.sleep(1) + + # Start the second follower to create a consensus with the first + # follower so that the first follower may become the new leader. + print("Starting worker {}".format(follower_ids[1])) + self.start_worker(follower_ids[1]) + self.find_leader() # wait for leader re-election + + # Verify that the data is there -> connect to the new leader and execute + # "MATCH (n) RETURN n" -> the data should be there. + assert self.execute_step("count", expected_results=1), \ + "Error while executing count query" + + # Try to assemble the whole cluster again by returning the old leader + # to the cluster as a fresh machine. + # (start the machine with its durability directory previously removed) + shutil.rmtree(self.get_durability_directory(leader_id)) + self.start_worker(leader_id) + self.find_leader() # wait for leader re-election + assert self.execute_step("count", expected_results=1), \ + "Error while executing count query" + + +def find_correct_path(path): + f = os.path.join(PROJECT_DIR, "build", path) + if not os.path.exists(f): + f = os.path.join(PROJECT_DIR, "build_debug", path) + return f + + +if __name__ == "__main__": + memgraph_binary = find_correct_path("memgraph_ha") + tester_binary = find_correct_path(os.path.join("tests", "integration", "ha", + "term_updates", "tester")) + + raft_config_file = os.path.join(PROJECT_DIR, "tests", "integration", "ha", + "term_updates", "raft.json") + + parser = argparse.ArgumentParser() + parser.add_argument("--memgraph", default=memgraph_binary) + parser.add_argument("--raft_config_file", default=raft_config_file) + parser.add_argument("--username", default="") + parser.add_argument("--password", default="") + parser.add_argument("--encrypted", type=bool, default=False) + parser.add_argument("--address", type=str, + default="bolt://127.0.0.1") + parser.add_argument("--port", type=int, + default=7687) + args = parser.parse_args() + + cluster_size = 3 # test only works for 3 nodes + print("\033[1;36m~~ Executing test with cluster size: %d~~\033[0m" + % (cluster_size)) + HaTermUpdatesTest( + args.memgraph, tester_binary, args.raft_config_file, cluster_size) + print("\033[1;32m~~ The test finished successfully ~~\033[0m") + + sys.exit(0) diff --git a/tests/integration/ha/term_updates/tester.cpp b/tests/integration/ha/term_updates/tester.cpp new file mode 100644 index 000000000..fc5d823da --- /dev/null +++ b/tests/integration/ha/term_updates/tester.cpp @@ -0,0 +1,77 @@ +#include +#include +#include + +#include +#include + +#include "communication/bolt/ha_client.hpp" +#include "io/network/endpoint.hpp" +#include "io/network/utils.hpp" +#include "utils/timer.hpp" + +DEFINE_string(address, "127.0.0.1", "Server address"); +DEFINE_int32(port, 7687, "Server port"); +DEFINE_int32(cluster_size, 3, "Size of the raft cluster."); +DEFINE_int32(expected_results, -1, "Number of expected nodes."); +DEFINE_int32(num_retries, 20, "Number of (leader) execution retries."); +DEFINE_string(username, "", "Username for the database"); +DEFINE_string(password, "", "Password for the database"); +DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server."); + +DEFINE_string(step, "create", "The step to execute (available: create, count)"); + +using namespace std::chrono_literals; + +int main(int argc, char **argv) { + gflags::ParseCommandLineFlags(&argc, &argv, true); + google::InitGoogleLogging(argv[0]); + + communication::Init(); + + try { + std::vector endpoints; + for (int i = 0; i < FLAGS_cluster_size; ++i) { + uint16_t port = FLAGS_port + i; + io::network::Endpoint endpoint{FLAGS_address, port}; + endpoints.push_back(endpoint); + } + + std::chrono::milliseconds retry_delay(1000); + communication::ClientContext context(FLAGS_use_ssl); + communication::bolt::HAClient client(endpoints, &context, FLAGS_username, + FLAGS_password, FLAGS_num_retries, + retry_delay); + + if (FLAGS_step == "create") { + client.Execute("create (:Node)", {}); + return 0; + + } else if (FLAGS_step == "count") { + auto result = client.Execute("match (n) return n", {}); + + if (result.records.size() != FLAGS_expected_results) { + LOG(WARNING) << "Unexpected number of nodes: " + << "expected " << FLAGS_expected_results + << ", got " << result.records.size(); + return 2; + } + return 0; + + } else if (FLAGS_step == "find_leader") { + std::cout << client.GetLeaderId(); + return 0; + + } else { + LOG(FATAL) << "Unexpected client step!"; + } + } catch (const communication::bolt::ClientQueryException &) { + LOG(WARNING) << "There was some transient error during query execution."; + } catch (const communication::bolt::ClientFatalException &) { + LOG(WARNING) << "Failed to communicate with the leader."; + } catch (const utils::BasicException &e) { + LOG(WARNING) << "Error while executing query."; + } + + return 1; +}