From c0cc661149e0e3d5b637a049510284900ba93807 Mon Sep 17 00:00:00 2001 From: Matija Santl Date: Thu, 10 Jan 2019 09:28:28 +0100 Subject: [PATCH] Extend raft integration test Summary: Run the basic test with two cluster sizes, 3 and 5. Reviewers: ipaljak Reviewed By: ipaljak Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1794 --- tests/integration/apollo_runs.yaml | 1 - tests/integration/ha_basic/runner.py | 64 ++++++++++++++++++---------- 2 files changed, 42 insertions(+), 23 deletions(-) diff --git a/tests/integration/apollo_runs.yaml b/tests/integration/apollo_runs.yaml index 587f9158b..2840a2a93 100644 --- a/tests/integration/apollo_runs.yaml +++ b/tests/integration/apollo_runs.yaml @@ -57,6 +57,5 @@ infiles: - runner.py # runner script - raft.json # raft configuration - - coordination.json # coordination configuration - ../../../build_debug/memgraph_ha # memgraph distributed binary - ../../../build_debug/tests/integration/ha_basic/tester # tester binary diff --git a/tests/integration/ha_basic/runner.py b/tests/integration/ha_basic/runner.py index f08f25541..5b6ac60d2 100755 --- a/tests/integration/ha_basic/runner.py +++ b/tests/integration/ha_basic/runner.py @@ -68,6 +68,23 @@ def execute_test(memgraph_binary, tester_binary, raft_config_file, coordination_config_file, cluster_size): print("\033[1;36m~~ Executing test with cluster size: %d~~\033[0m" % (cluster_size)) + global workers + + def kill_worker(worker_id): + print("Killing worker %d" % (worker_id + 1)) + global workers + workers[worker_id].kill() + workers[worker_id].wait() + + def start_worker(worker_id): + print("Starting worker %d" % (worker_id + 1)) + workers[worker_id] = subprocess.Popen(generate_args(memgraph_binary, + tempdir.name, worker_id, raft_config_file, + coordination_config_file)) + time.sleep(0.2) + assert workers[worker_id].poll() is None, \ + "Worker{} process died prematurely!".format(worker_id) + wait_for_server(7687 + worker_id) # Get a temporary directory used for durability. tempdir = tempfile.TemporaryDirectory() @@ -101,13 +118,12 @@ def execute_test(memgraph_binary, tester_binary, raft_config_file, codes.append(code) for i in range(2 * cluster_size): - worker_id = i % cluster_size + partition = random.sample(range(cluster_size), int((cluster_size - 1) / 2)) + # kill workers + for worker_id in partition: + kill_worker(worker_id) - workers[worker_id].kill() - workers[worker_id].wait() - print("Killing worker %d" % (worker_id + 1)) - - time.sleep(2) # allow for possible leader re-election + time.sleep(2) # allow some time for possible leader re-election if random.random() < 0.5: print("Executing Create query") @@ -129,14 +145,8 @@ def execute_test(memgraph_binary, tester_binary, raft_config_file, break # Bring it back to life. - print("Starting worker %d" % (worker_id + 1)) - workers[worker_id] = subprocess.Popen(generate_args(memgraph_binary, - tempdir.name, worker_id, raft_config_file, - coordination_config_file)) - time.sleep(0.2) - assert workers[worker_id].poll() is None, \ - "Worker{} process died prematurely!".format(worker_id) - wait_for_server(7687 + worker_id) + for worker_id in partition: + start_worker(worker_id) code = execute_step(tester_binary, cluster_size, expected_results, "count") if code != 0: @@ -154,6 +164,14 @@ def find_correct_path(path): f = os.path.join(PROJECT_DIR, "build_debug", path) return f + +def generate_json_coordination_config(cluster_size): + data = [] + for i in range(cluster_size): + data.append([i + 1, "127.0.0.1", 10000 + i]) + return json.dumps(data) + + if __name__ == "__main__": memgraph_binary = find_correct_path("memgraph_ha") tester_binary = find_correct_path(os.path.join("tests", "integration", @@ -162,20 +180,22 @@ if __name__ == "__main__": raft_config_file = os.path.join(PROJECT_DIR, "tests", "integration", "ha_basic", "raft.json") - coordination_config_file = os.path.join(PROJECT_DIR, "tests", "integration", - "ha_basic", "coordination.json") - parser = argparse.ArgumentParser() parser.add_argument("--memgraph", default=memgraph_binary) parser.add_argument("--tester", default=tester_binary) parser.add_argument("--raft_config_file", default=raft_config_file) - parser.add_argument("--coordination_config_file", - default=coordination_config_file) args = parser.parse_args() - execute_test(args.memgraph, args.tester, - args.raft_config_file, - args.coordination_config_file, 3) + for cluster_size in [3, 5]: + tmpfile = tempfile.NamedTemporaryFile() + coordination = generate_json_coordination_config(cluster_size) + tmpfile.write(bytes(coordination, "UTF-8")) + tmpfile.flush() + + execute_test(args.memgraph, args.tester, args.raft_config_file, + tmpfile.name, cluster_size) + + tmpfile.close() print("\033[1;32m~~ The test finished successfully ~~\033[0m") sys.exit(0)