Extend raft integration test

Summary: Run the basic test with two cluster sizes, 3 and 5.

Reviewers: ipaljak

Reviewed By: ipaljak

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1794
This commit is contained in:
Matija Santl 2019-01-10 09:28:28 +01:00
parent 17654b6d6c
commit c0cc661149
2 changed files with 42 additions and 23 deletions

View File

@ -57,6 +57,5 @@
infiles: infiles:
- runner.py # runner script - runner.py # runner script
- raft.json # raft configuration - raft.json # raft configuration
- coordination.json # coordination configuration
- ../../../build_debug/memgraph_ha # memgraph distributed binary - ../../../build_debug/memgraph_ha # memgraph distributed binary
- ../../../build_debug/tests/integration/ha_basic/tester # tester binary - ../../../build_debug/tests/integration/ha_basic/tester # tester binary

View File

@ -68,6 +68,23 @@ def execute_test(memgraph_binary, tester_binary, raft_config_file,
coordination_config_file, cluster_size): coordination_config_file, cluster_size):
print("\033[1;36m~~ Executing test with cluster size: %d~~\033[0m" % (cluster_size)) print("\033[1;36m~~ Executing test with cluster size: %d~~\033[0m" % (cluster_size))
global workers
def kill_worker(worker_id):
print("Killing worker %d" % (worker_id + 1))
global workers
workers[worker_id].kill()
workers[worker_id].wait()
def start_worker(worker_id):
print("Starting worker %d" % (worker_id + 1))
workers[worker_id] = subprocess.Popen(generate_args(memgraph_binary,
tempdir.name, worker_id, raft_config_file,
coordination_config_file))
time.sleep(0.2)
assert workers[worker_id].poll() is None, \
"Worker{} process died prematurely!".format(worker_id)
wait_for_server(7687 + worker_id)
# Get a temporary directory used for durability. # Get a temporary directory used for durability.
tempdir = tempfile.TemporaryDirectory() tempdir = tempfile.TemporaryDirectory()
@ -101,13 +118,12 @@ def execute_test(memgraph_binary, tester_binary, raft_config_file,
codes.append(code) codes.append(code)
for i in range(2 * cluster_size): for i in range(2 * cluster_size):
worker_id = i % cluster_size partition = random.sample(range(cluster_size), int((cluster_size - 1) / 2))
# kill workers
for worker_id in partition:
kill_worker(worker_id)
workers[worker_id].kill() time.sleep(2) # allow some time for possible leader re-election
workers[worker_id].wait()
print("Killing worker %d" % (worker_id + 1))
time.sleep(2) # allow for possible leader re-election
if random.random() < 0.5: if random.random() < 0.5:
print("Executing Create query") print("Executing Create query")
@ -129,14 +145,8 @@ def execute_test(memgraph_binary, tester_binary, raft_config_file,
break break
# Bring it back to life. # Bring it back to life.
print("Starting worker %d" % (worker_id + 1)) for worker_id in partition:
workers[worker_id] = subprocess.Popen(generate_args(memgraph_binary, start_worker(worker_id)
tempdir.name, worker_id, raft_config_file,
coordination_config_file))
time.sleep(0.2)
assert workers[worker_id].poll() is None, \
"Worker{} process died prematurely!".format(worker_id)
wait_for_server(7687 + worker_id)
code = execute_step(tester_binary, cluster_size, expected_results, "count") code = execute_step(tester_binary, cluster_size, expected_results, "count")
if code != 0: if code != 0:
@ -154,6 +164,14 @@ def find_correct_path(path):
f = os.path.join(PROJECT_DIR, "build_debug", path) f = os.path.join(PROJECT_DIR, "build_debug", path)
return f return f
def generate_json_coordination_config(cluster_size):
data = []
for i in range(cluster_size):
data.append([i + 1, "127.0.0.1", 10000 + i])
return json.dumps(data)
if __name__ == "__main__": if __name__ == "__main__":
memgraph_binary = find_correct_path("memgraph_ha") memgraph_binary = find_correct_path("memgraph_ha")
tester_binary = find_correct_path(os.path.join("tests", "integration", tester_binary = find_correct_path(os.path.join("tests", "integration",
@ -162,20 +180,22 @@ if __name__ == "__main__":
raft_config_file = os.path.join(PROJECT_DIR, "tests", "integration", raft_config_file = os.path.join(PROJECT_DIR, "tests", "integration",
"ha_basic", "raft.json") "ha_basic", "raft.json")
coordination_config_file = os.path.join(PROJECT_DIR, "tests", "integration",
"ha_basic", "coordination.json")
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("--memgraph", default=memgraph_binary) parser.add_argument("--memgraph", default=memgraph_binary)
parser.add_argument("--tester", default=tester_binary) parser.add_argument("--tester", default=tester_binary)
parser.add_argument("--raft_config_file", default=raft_config_file) parser.add_argument("--raft_config_file", default=raft_config_file)
parser.add_argument("--coordination_config_file",
default=coordination_config_file)
args = parser.parse_args() args = parser.parse_args()
execute_test(args.memgraph, args.tester, for cluster_size in [3, 5]:
args.raft_config_file, tmpfile = tempfile.NamedTemporaryFile()
args.coordination_config_file, 3) coordination = generate_json_coordination_config(cluster_size)
tmpfile.write(bytes(coordination, "UTF-8"))
tmpfile.flush()
execute_test(args.memgraph, args.tester, args.raft_config_file,
tmpfile.name, cluster_size)
tmpfile.close()
print("\033[1;32m~~ The test finished successfully ~~\033[0m") print("\033[1;32m~~ The test finished successfully ~~\033[0m")
sys.exit(0) sys.exit(0)