From 59e11dbf9ff5b863b92f798d80f55a72e879005d Mon Sep 17 00:00:00 2001 From: Matija Santl Date: Wed, 6 Feb 2019 15:40:03 +0100 Subject: [PATCH] Add Raft log compaction transfer test Summary: I've refactored the integration test for HA so we can reuse the common parts like starting/stopping workers. I've also added a test that triggers the log compaction and it checks that the snapshot that has been transferred is the same as the origin one. Reviewers: ipaljak Reviewed By: ipaljak Subscribers: mferencevic, pullbot Differential Revision: https://phabricator.memgraph.io/D1847 --- tests/integration/CMakeLists.txt | 5 +- tests/integration/apollo_runs.yaml | 17 +- .../{ha_basic => ha/basic}/CMakeLists.txt | 0 .../{ha_basic => ha/basic}/raft.json | 0 tests/integration/ha/basic/runner.py | 105 +++++++++ .../{ha_basic => ha/basic}/tester.cpp | 0 tests/integration/ha/ha_test.py | 109 ++++++++++ tests/integration/ha/log_compaction/raft.json | 6 + tests/integration/ha/log_compaction/runner.py | 101 +++++++++ tests/integration/ha_basic/coordination.json | 5 - tests/integration/ha_basic/runner.py | 201 ------------------ 11 files changed, 338 insertions(+), 211 deletions(-) rename tests/integration/{ha_basic => ha/basic}/CMakeLists.txt (100%) rename tests/integration/{ha_basic => ha/basic}/raft.json (100%) create mode 100755 tests/integration/ha/basic/runner.py rename tests/integration/{ha_basic => ha/basic}/tester.cpp (100%) create mode 100644 tests/integration/ha/ha_test.py create mode 100644 tests/integration/ha/log_compaction/raft.json create mode 100755 tests/integration/ha/log_compaction/runner.py delete mode 100644 tests/integration/ha_basic/coordination.json delete mode 100755 tests/integration/ha_basic/runner.py diff --git a/tests/integration/CMakeLists.txt b/tests/integration/CMakeLists.txt index a41507925..8cd148f4e 100644 --- a/tests/integration/CMakeLists.txt +++ b/tests/integration/CMakeLists.txt @@ -16,5 +16,6 @@ add_subdirectory(auth) # distributed test binaries add_subdirectory(distributed) -# raft test binaries -add_subdirectory(ha_basic) +# distributed ha/basic binaries +add_subdirectory(ha/basic) + diff --git a/tests/integration/apollo_runs.yaml b/tests/integration/apollo_runs.yaml index 2840a2a93..94c4470a1 100644 --- a/tests/integration/apollo_runs.yaml +++ b/tests/integration/apollo_runs.yaml @@ -52,10 +52,21 @@ - ../../../build_debug/tests/integration/distributed/tester # tester binary - name: integration__ha_basic - cd: ha_basic + cd: ha/basic commands: ./runner.py infiles: - runner.py # runner script - raft.json # raft configuration - - ../../../build_debug/memgraph_ha # memgraph distributed binary - - ../../../build_debug/tests/integration/ha_basic/tester # tester binary + - ../ha_test.py # raft test base module + - ../../../../build_debug/memgraph_ha # memgraph distributed binary + - ../../../../build_debug/tests/integration/ha/basic/tester # tester binary + +- name: integration__ha_log_compaction + cd: ha/log_compaction + commands: ./runner.py + infiles: + - runner.py # runner script + - raft.json # raft configuration + - ../ha_test.py # raft test base module + - ../../../../build_debug/memgraph_ha # memgraph distributed binary + - ../../../../build_debug/tests/manual/ha_client # tester binary diff --git a/tests/integration/ha_basic/CMakeLists.txt b/tests/integration/ha/basic/CMakeLists.txt similarity index 100% rename from tests/integration/ha_basic/CMakeLists.txt rename to tests/integration/ha/basic/CMakeLists.txt diff --git a/tests/integration/ha_basic/raft.json b/tests/integration/ha/basic/raft.json similarity index 100% rename from tests/integration/ha_basic/raft.json rename to tests/integration/ha/basic/raft.json diff --git a/tests/integration/ha/basic/runner.py b/tests/integration/ha/basic/runner.py new file mode 100755 index 000000000..d858c4407 --- /dev/null +++ b/tests/integration/ha/basic/runner.py @@ -0,0 +1,105 @@ +#!/usr/bin/python3 + +import argparse +import os +import time +import subprocess +import sys +import random + +SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) +PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", "..", "..")) + +# append parent directory +sys.path.append(os.path.join(SCRIPT_DIR, "..")) + +from ha_test import HaTestBase + + +class HaBasicTest(HaTestBase): + def execute_step(self, step, expected_results): + if step == "create": + client = subprocess.Popen([self.tester_binary, "--step", "create", + "--cluster_size", str(self.cluster_size)]) + + elif step == "count": + client = subprocess.Popen([self.tester_binary, "--step", "count", + "--cluster_size", str(self.cluster_size), "--expected_results", + str(expected_results)]) + else: + return 0 + + # Check what happened with query execution. + try: + code = client.wait(timeout=30) + except subprocess.TimeoutExpired as e: + client.kill() + return 1 + + return code + + + def execute(self): + self.start_cluster() + + expected_results = 0 + + # Make sure at least one node exists. + assert self.execute_step("create", expected_results) == 0, \ + "Error while executing create query" + expected_results = 1 + + for i in range(2 * self.cluster_size): + partition = random.sample(range(self.cluster_size), + int((self.cluster_size - 1) / 2)) + + # Kill workers. + for worker_id in partition: + self.kill_worker(worker_id) + + time.sleep(2) # allow some time for possible leader re-election + + if random.random() < 0.7: + assert self.execute_step("create", expected_results) == 0, \ + "Error while executing create query" + expected_results += 1 + else: + assert self.execute_step("count", expected_results) == 0, \ + "Error while executing count query" + + # Bring workers back to life. + for worker_id in partition: + self.start_worker(worker_id) + + # Check that no data was lost. + assert self.execute_step("count", expected_results) == 0, \ + "Error while executing count query" + + +def find_correct_path(path): + f = os.path.join(PROJECT_DIR, "build", path) + if not os.path.exists(f): + f = os.path.join(PROJECT_DIR, "build_debug", path) + return f + + +if __name__ == "__main__": + memgraph_binary = find_correct_path("memgraph_ha") + tester_binary = find_correct_path(os.path.join("tests", "integration", "ha", + "basic", "tester")) + + raft_config_file = os.path.join(PROJECT_DIR, "tests", "integration", "ha", + "basic", "raft.json") + + parser = argparse.ArgumentParser() + parser.add_argument("--memgraph", default=memgraph_binary) + parser.add_argument("--raft_config_file", default=raft_config_file) + args = parser.parse_args() + + for cluster_size in [3, 5]: + print("\033[1;36m~~ Executing test with cluster size: %d~~\033[0m" % (cluster_size)) + HaBasicTest( + args.memgraph, tester_binary, args.raft_config_file, cluster_size) + print("\033[1;32m~~ The test finished successfully ~~\033[0m") + + sys.exit(0) diff --git a/tests/integration/ha_basic/tester.cpp b/tests/integration/ha/basic/tester.cpp similarity index 100% rename from tests/integration/ha_basic/tester.cpp rename to tests/integration/ha/basic/tester.cpp diff --git a/tests/integration/ha/ha_test.py b/tests/integration/ha/ha_test.py new file mode 100644 index 000000000..b766a1149 --- /dev/null +++ b/tests/integration/ha/ha_test.py @@ -0,0 +1,109 @@ +#!/usr/bin/python3 + +import json +import os +import subprocess +import tempfile +import time + + +class HaTestBase: + def __init__(self, memgraph_binary, tester_binary, raft_config_file, + cluster_size): + + self.workers = [None for worker in range(cluster_size)] + self.memgraph_binary = memgraph_binary + self.tester_binary = tester_binary + self.raft_config_file = raft_config_file + self.cluster_size = cluster_size + + # Get a temporary directory used for durability. + self._tempdir = tempfile.TemporaryDirectory() + + # generate coordination config file + self.coordination_config_file = tempfile.NamedTemporaryFile() + coordination = self._generate_json_coordination_config() + self.coordination_config_file.write(bytes(coordination, "UTF-8")) + self.coordination_config_file.flush() + + self.execute() + + + def __del__(self): + for worker in self.workers: + if worker is None: continue + worker.kill() + worker.wait() + self.workers.clear() + self.coordination_config_file.close() + + + def start_cluster(self): + for worker_id in range(self.cluster_size): + self.start_worker(worker_id) + + # allow some time for leader election + time.sleep(5) + + + def kill_worker(self, worker_id): + assert worker_id >= 0 and worker_id < self.cluster_size, \ + "Invalid worker ID {}".format(worker_id) + assert self.workers[worker_id] is not None, \ + "Worker {} doesn't exists".format(worker_id) + + self.workers[worker_id].kill() + self.workers[worker_id].wait() + self.workers[worker_id] = None + + + def start_worker(self, worker_id): + assert worker_id >= 0 and worker_id < self.cluster_size, \ + "Invalid worker ID {}".format(worker_id) + assert self.workers[worker_id] is None, \ + "Worker already exists".format(worker_id) + + self.workers[worker_id] = subprocess.Popen(self._generate_args(worker_id)) + + time.sleep(0.2) + assert self.workers[worker_id].poll() is None, \ + "Worker{} process died prematurely!".format(worker_id) + + self._wait_for_server(7687 + worker_id) + + + def execute(self): + raise NotImplementedError() + + + def _wait_for_server(self, port, delay=0.1): + cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)] + while subprocess.call(cmd) != 0: + time.sleep(delay) + time.sleep(delay) + + + def get_durability_directory(self, worker_id): + return os.path.join(self._tempdir.name, "worker" + str(worker_id)) + + + def _generate_args(self, worker_id): + args = [self.memgraph_binary] + args.extend(["--server_id", str(worker_id + 1)]) + args.extend(["--port", str(7687 + worker_id)]) + args.extend(["--raft_config_file", self.raft_config_file]) + args.extend(["--coordination_config_file", + self.coordination_config_file.name]) + + # Each worker must have a unique durability directory. + args.extend(["--durability_directory", + self.get_durability_directory(worker_id)]) + return args + + + def _generate_json_coordination_config(self): + data = [] + for i in range(self.cluster_size): + data.append([i + 1, "127.0.0.1", 10000 + i]) + return json.dumps(data) + diff --git a/tests/integration/ha/log_compaction/raft.json b/tests/integration/ha/log_compaction/raft.json new file mode 100644 index 000000000..6528d3228 --- /dev/null +++ b/tests/integration/ha/log_compaction/raft.json @@ -0,0 +1,6 @@ +{ + "election_timeout_min": 200, + "election_timeout_max": 500, + "heartbeat_interval": 100, + "log_size_snapshot_threshold": 100 +} diff --git a/tests/integration/ha/log_compaction/runner.py b/tests/integration/ha/log_compaction/runner.py new file mode 100755 index 000000000..7788cbd21 --- /dev/null +++ b/tests/integration/ha/log_compaction/runner.py @@ -0,0 +1,101 @@ +#!/usr/bin/python3 + +import argparse +import hashlib +import os +import time +import subprocess +import sys +import random + +SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) +PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", "..", "..")) + +# append parent directory +sys.path.append(os.path.join(SCRIPT_DIR, "..")) + +from ha_test import HaTestBase + + +def shasum(filename): + with open(filename, 'rb') as f: + return hashlib.sha1(f.read()).hexdigest() + + raise Exception("Couldn't get shasum for file {}".format(filename)) + + +class HaLogCompactionTest(HaTestBase): + def execute_step(self, query): + client = subprocess.Popen( + [self.tester_binary, "--cluster_size", str(self.cluster_size)], + stdin=subprocess.PIPE) + + try: + client.communicate(input=bytes(query, "UTF-8"), timeout=30) + except subprocess.TimeoutExpired as e: + client.kill() + client.communicate() + return 1 + + return client.returncode + + + def get_snapshot_path(self, worker_id): + dur = os.path.join(self.get_durability_directory(worker_id), "snapshots") + snapshots = os.listdir(dur) + + assert len(snapshots) == 1, \ + "More than one snapshot on worker {}!".format(worker_id) + return os.path.join(dur, snapshots[0]) + + + def execute(self): + # custom cluster startup + for worker_id in range(1, self.cluster_size): + self.start_worker(worker_id) + + time.sleep(5) + self.execute_step("CREATE (:Node)\n" * 128) + + self.start_worker(0) + + # allow some time for the snapshot transfer + time.sleep(5) + snapshot_shasum = shasum(self.get_snapshot_path(0)) + + success = False + for worker_id in range(1, self.cluster_size): + if shasum(self.get_snapshot_path(worker_id)) == snapshot_shasum: + success = True + break + + assert success, "Snapshot didn't transfer successfully" + + +def find_correct_path(path): + f = os.path.join(PROJECT_DIR, "build", path) + if not os.path.exists(f): + f = os.path.join(PROJECT_DIR, "build_debug", path) + return f + + +if __name__ == "__main__": + memgraph_binary = find_correct_path("memgraph_ha") + tester_binary = find_correct_path(os.path.join("tests", "manual", + "ha_client")) + + raft_config_file = os.path.join(PROJECT_DIR, "tests", "integration", "ha", + "log_compaction", "raft.json") + + parser = argparse.ArgumentParser() + parser.add_argument("--memgraph", default=memgraph_binary) + parser.add_argument("--raft_config_file", default=raft_config_file) + args = parser.parse_args() + + for cluster_size in [3, 5]: + print("\033[1;36m~~ Executing test with cluster size: %d~~\033[0m" % (cluster_size)) + HaLogCompactionTest( + args.memgraph, tester_binary, args.raft_config_file, cluster_size) + print("\033[1;32m~~ The test finished successfully ~~\033[0m") + + sys.exit(0) diff --git a/tests/integration/ha_basic/coordination.json b/tests/integration/ha_basic/coordination.json deleted file mode 100644 index 35b8c65aa..000000000 --- a/tests/integration/ha_basic/coordination.json +++ /dev/null @@ -1,5 +0,0 @@ -[ - [1, "127.0.0.1", 10000], - [2, "127.0.0.1", 10001], - [3, "127.0.0.1", 10002] -] diff --git a/tests/integration/ha_basic/runner.py b/tests/integration/ha_basic/runner.py deleted file mode 100755 index 5b6ac60d2..000000000 --- a/tests/integration/ha_basic/runner.py +++ /dev/null @@ -1,201 +0,0 @@ -#!/usr/bin/python3 -import argparse -import atexit -import json -import os -import subprocess -import tempfile -import time -import sys -import random - -SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) -PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", "..")) - -workers = [] - - -def cleanup(): - for worker in workers: - worker.kill() - worker.wait() - workers.clear() - - -def wait_for_server(port, delay=0.1): - cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)] - while subprocess.call(cmd) != 0: - time.sleep(0.01) - time.sleep(delay) - - -def generate_args(memgraph_binary, temporary_dir, worker_id, raft_config_file, - coordination_config_file): - args = [memgraph_binary] - args.extend(["--server_id", str(worker_id + 1), "--port", str(7687 + worker_id)]) - args.extend(["--raft_config_file", raft_config_file]) - args.extend(["--coordination_config_file", coordination_config_file]) - - # Each worker must have a unique durability directory. - args.extend(["--durability_directory", - os.path.join(temporary_dir, "worker" + str(worker_id))]) - return args - - -def execute_step(tester_binary, cluster_size, expected_results, step): - if step == "create": - client = subprocess.Popen([tester_binary, "--step", "create", - "--cluster_size", str(cluster_size)]) - - elif step == "count": - client = subprocess.Popen([tester_binary, "--step", "count", - "--cluster_size", str(cluster_size), "--expected_results", - str(expected_results)]) - else: - return 0 - - # Check what happened with query execution. - try: - code = client.wait(timeout=30) - except subprocess.TimeoutExpired as e: - client.kill() - raise e - - return code - - -def execute_test(memgraph_binary, tester_binary, raft_config_file, - coordination_config_file, cluster_size): - - print("\033[1;36m~~ Executing test with cluster size: %d~~\033[0m" % (cluster_size)) - global workers - - def kill_worker(worker_id): - print("Killing worker %d" % (worker_id + 1)) - global workers - workers[worker_id].kill() - workers[worker_id].wait() - - def start_worker(worker_id): - print("Starting worker %d" % (worker_id + 1)) - workers[worker_id] = subprocess.Popen(generate_args(memgraph_binary, - tempdir.name, worker_id, raft_config_file, - coordination_config_file)) - time.sleep(0.2) - assert workers[worker_id].poll() is None, \ - "Worker{} process died prematurely!".format(worker_id) - wait_for_server(7687 + worker_id) - - # Get a temporary directory used for durability. - tempdir = tempfile.TemporaryDirectory() - - # Start the cluster. - cleanup() - for worker_id in range(cluster_size): - workers.append(subprocess.Popen(generate_args(memgraph_binary, - tempdir.name, worker_id, raft_config_file, - coordination_config_file))) - - time.sleep(0.2) - assert workers[worker_id].poll() is None, \ - "Worker{} process died prematurely!".format(worker_id) - - # Wait for the cluster to start. - for worker_id in range(cluster_size): - wait_for_server(7687 + worker_id) - - time.sleep(1) - expected_results = 0 - - # Array of exit codes. - codes = [] - - code = execute_step(tester_binary, cluster_size, expected_results, "create") - if code == 0: - expected_results += 1 - else: - print("The client process didn't exit cleanly (code %d)!" % code) - codes.append(code) - - for i in range(2 * cluster_size): - partition = random.sample(range(cluster_size), int((cluster_size - 1) / 2)) - # kill workers - for worker_id in partition: - kill_worker(worker_id) - - time.sleep(2) # allow some time for possible leader re-election - - if random.random() < 0.5: - print("Executing Create query") - code = execute_step(tester_binary, cluster_size, expected_results, - "create") - if code == 0: - expected_results += 1 - else: - print("The client process didn't exit cleanly (code %d)!" % code) - codes.append(code) - break - else: - print("Executing Count query") - code = execute_step(tester_binary, cluster_size, expected_results, - "count") - if code != 0: - print("The client process didn't exit cleanly (code %d)!" % code) - codes.append(code) - break - - # Bring it back to life. - for worker_id in partition: - start_worker(worker_id) - - code = execute_step(tester_binary, cluster_size, expected_results, "count") - if code != 0: - print("The client process didn't exit cleanly (code %d)!" % code) - codes.append(code) - - # Terminate all workers. - cleanup() - assert not any(codes), "Something went wrong!" - - -def find_correct_path(path): - f = os.path.join(PROJECT_DIR, "build", path) - if not os.path.exists(f): - f = os.path.join(PROJECT_DIR, "build_debug", path) - return f - - -def generate_json_coordination_config(cluster_size): - data = [] - for i in range(cluster_size): - data.append([i + 1, "127.0.0.1", 10000 + i]) - return json.dumps(data) - - -if __name__ == "__main__": - memgraph_binary = find_correct_path("memgraph_ha") - tester_binary = find_correct_path(os.path.join("tests", "integration", - "ha_basic", "tester")) - - raft_config_file = os.path.join(PROJECT_DIR, "tests", "integration", - "ha_basic", "raft.json") - - parser = argparse.ArgumentParser() - parser.add_argument("--memgraph", default=memgraph_binary) - parser.add_argument("--tester", default=tester_binary) - parser.add_argument("--raft_config_file", default=raft_config_file) - args = parser.parse_args() - - for cluster_size in [3, 5]: - tmpfile = tempfile.NamedTemporaryFile() - coordination = generate_json_coordination_config(cluster_size) - tmpfile.write(bytes(coordination, "UTF-8")) - tmpfile.flush() - - execute_test(args.memgraph, args.tester, args.raft_config_file, - tmpfile.name, cluster_size) - - tmpfile.close() - - print("\033[1;32m~~ The test finished successfully ~~\033[0m") - sys.exit(0)