Add Raft log compaction transfer test
Summary: I've refactored the integration test for HA so we can reuse the common parts like starting/stopping workers. I've also added a test that triggers the log compaction and it checks that the snapshot that has been transferred is the same as the origin one. Reviewers: ipaljak Reviewed By: ipaljak Subscribers: mferencevic, pullbot Differential Revision: https://phabricator.memgraph.io/D1847
This commit is contained in:
parent
53d8f725c5
commit
59e11dbf9f
@ -16,5 +16,6 @@ add_subdirectory(auth)
|
||||
# distributed test binaries
|
||||
add_subdirectory(distributed)
|
||||
|
||||
# raft test binaries
|
||||
add_subdirectory(ha_basic)
|
||||
# distributed ha/basic binaries
|
||||
add_subdirectory(ha/basic)
|
||||
|
||||
|
@ -52,10 +52,21 @@
|
||||
- ../../../build_debug/tests/integration/distributed/tester # tester binary
|
||||
|
||||
- name: integration__ha_basic
|
||||
cd: ha_basic
|
||||
cd: ha/basic
|
||||
commands: ./runner.py
|
||||
infiles:
|
||||
- runner.py # runner script
|
||||
- raft.json # raft configuration
|
||||
- ../../../build_debug/memgraph_ha # memgraph distributed binary
|
||||
- ../../../build_debug/tests/integration/ha_basic/tester # tester binary
|
||||
- ../ha_test.py # raft test base module
|
||||
- ../../../../build_debug/memgraph_ha # memgraph distributed binary
|
||||
- ../../../../build_debug/tests/integration/ha/basic/tester # tester binary
|
||||
|
||||
- name: integration__ha_log_compaction
|
||||
cd: ha/log_compaction
|
||||
commands: ./runner.py
|
||||
infiles:
|
||||
- runner.py # runner script
|
||||
- raft.json # raft configuration
|
||||
- ../ha_test.py # raft test base module
|
||||
- ../../../../build_debug/memgraph_ha # memgraph distributed binary
|
||||
- ../../../../build_debug/tests/manual/ha_client # tester binary
|
||||
|
105
tests/integration/ha/basic/runner.py
Executable file
105
tests/integration/ha/basic/runner.py
Executable file
@ -0,0 +1,105 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import time
|
||||
import subprocess
|
||||
import sys
|
||||
import random
|
||||
|
||||
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
||||
PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", "..", ".."))
|
||||
|
||||
# append parent directory
|
||||
sys.path.append(os.path.join(SCRIPT_DIR, ".."))
|
||||
|
||||
from ha_test import HaTestBase
|
||||
|
||||
|
||||
class HaBasicTest(HaTestBase):
|
||||
def execute_step(self, step, expected_results):
|
||||
if step == "create":
|
||||
client = subprocess.Popen([self.tester_binary, "--step", "create",
|
||||
"--cluster_size", str(self.cluster_size)])
|
||||
|
||||
elif step == "count":
|
||||
client = subprocess.Popen([self.tester_binary, "--step", "count",
|
||||
"--cluster_size", str(self.cluster_size), "--expected_results",
|
||||
str(expected_results)])
|
||||
else:
|
||||
return 0
|
||||
|
||||
# Check what happened with query execution.
|
||||
try:
|
||||
code = client.wait(timeout=30)
|
||||
except subprocess.TimeoutExpired as e:
|
||||
client.kill()
|
||||
return 1
|
||||
|
||||
return code
|
||||
|
||||
|
||||
def execute(self):
|
||||
self.start_cluster()
|
||||
|
||||
expected_results = 0
|
||||
|
||||
# Make sure at least one node exists.
|
||||
assert self.execute_step("create", expected_results) == 0, \
|
||||
"Error while executing create query"
|
||||
expected_results = 1
|
||||
|
||||
for i in range(2 * self.cluster_size):
|
||||
partition = random.sample(range(self.cluster_size),
|
||||
int((self.cluster_size - 1) / 2))
|
||||
|
||||
# Kill workers.
|
||||
for worker_id in partition:
|
||||
self.kill_worker(worker_id)
|
||||
|
||||
time.sleep(2) # allow some time for possible leader re-election
|
||||
|
||||
if random.random() < 0.7:
|
||||
assert self.execute_step("create", expected_results) == 0, \
|
||||
"Error while executing create query"
|
||||
expected_results += 1
|
||||
else:
|
||||
assert self.execute_step("count", expected_results) == 0, \
|
||||
"Error while executing count query"
|
||||
|
||||
# Bring workers back to life.
|
||||
for worker_id in partition:
|
||||
self.start_worker(worker_id)
|
||||
|
||||
# Check that no data was lost.
|
||||
assert self.execute_step("count", expected_results) == 0, \
|
||||
"Error while executing count query"
|
||||
|
||||
|
||||
def find_correct_path(path):
|
||||
f = os.path.join(PROJECT_DIR, "build", path)
|
||||
if not os.path.exists(f):
|
||||
f = os.path.join(PROJECT_DIR, "build_debug", path)
|
||||
return f
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
memgraph_binary = find_correct_path("memgraph_ha")
|
||||
tester_binary = find_correct_path(os.path.join("tests", "integration", "ha",
|
||||
"basic", "tester"))
|
||||
|
||||
raft_config_file = os.path.join(PROJECT_DIR, "tests", "integration", "ha",
|
||||
"basic", "raft.json")
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--memgraph", default=memgraph_binary)
|
||||
parser.add_argument("--raft_config_file", default=raft_config_file)
|
||||
args = parser.parse_args()
|
||||
|
||||
for cluster_size in [3, 5]:
|
||||
print("\033[1;36m~~ Executing test with cluster size: %d~~\033[0m" % (cluster_size))
|
||||
HaBasicTest(
|
||||
args.memgraph, tester_binary, args.raft_config_file, cluster_size)
|
||||
print("\033[1;32m~~ The test finished successfully ~~\033[0m")
|
||||
|
||||
sys.exit(0)
|
109
tests/integration/ha/ha_test.py
Normal file
109
tests/integration/ha/ha_test.py
Normal file
@ -0,0 +1,109 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import tempfile
|
||||
import time
|
||||
|
||||
|
||||
class HaTestBase:
|
||||
def __init__(self, memgraph_binary, tester_binary, raft_config_file,
|
||||
cluster_size):
|
||||
|
||||
self.workers = [None for worker in range(cluster_size)]
|
||||
self.memgraph_binary = memgraph_binary
|
||||
self.tester_binary = tester_binary
|
||||
self.raft_config_file = raft_config_file
|
||||
self.cluster_size = cluster_size
|
||||
|
||||
# Get a temporary directory used for durability.
|
||||
self._tempdir = tempfile.TemporaryDirectory()
|
||||
|
||||
# generate coordination config file
|
||||
self.coordination_config_file = tempfile.NamedTemporaryFile()
|
||||
coordination = self._generate_json_coordination_config()
|
||||
self.coordination_config_file.write(bytes(coordination, "UTF-8"))
|
||||
self.coordination_config_file.flush()
|
||||
|
||||
self.execute()
|
||||
|
||||
|
||||
def __del__(self):
|
||||
for worker in self.workers:
|
||||
if worker is None: continue
|
||||
worker.kill()
|
||||
worker.wait()
|
||||
self.workers.clear()
|
||||
self.coordination_config_file.close()
|
||||
|
||||
|
||||
def start_cluster(self):
|
||||
for worker_id in range(self.cluster_size):
|
||||
self.start_worker(worker_id)
|
||||
|
||||
# allow some time for leader election
|
||||
time.sleep(5)
|
||||
|
||||
|
||||
def kill_worker(self, worker_id):
|
||||
assert worker_id >= 0 and worker_id < self.cluster_size, \
|
||||
"Invalid worker ID {}".format(worker_id)
|
||||
assert self.workers[worker_id] is not None, \
|
||||
"Worker {} doesn't exists".format(worker_id)
|
||||
|
||||
self.workers[worker_id].kill()
|
||||
self.workers[worker_id].wait()
|
||||
self.workers[worker_id] = None
|
||||
|
||||
|
||||
def start_worker(self, worker_id):
|
||||
assert worker_id >= 0 and worker_id < self.cluster_size, \
|
||||
"Invalid worker ID {}".format(worker_id)
|
||||
assert self.workers[worker_id] is None, \
|
||||
"Worker already exists".format(worker_id)
|
||||
|
||||
self.workers[worker_id] = subprocess.Popen(self._generate_args(worker_id))
|
||||
|
||||
time.sleep(0.2)
|
||||
assert self.workers[worker_id].poll() is None, \
|
||||
"Worker{} process died prematurely!".format(worker_id)
|
||||
|
||||
self._wait_for_server(7687 + worker_id)
|
||||
|
||||
|
||||
def execute(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
def _wait_for_server(self, port, delay=0.1):
|
||||
cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)]
|
||||
while subprocess.call(cmd) != 0:
|
||||
time.sleep(delay)
|
||||
time.sleep(delay)
|
||||
|
||||
|
||||
def get_durability_directory(self, worker_id):
|
||||
return os.path.join(self._tempdir.name, "worker" + str(worker_id))
|
||||
|
||||
|
||||
def _generate_args(self, worker_id):
|
||||
args = [self.memgraph_binary]
|
||||
args.extend(["--server_id", str(worker_id + 1)])
|
||||
args.extend(["--port", str(7687 + worker_id)])
|
||||
args.extend(["--raft_config_file", self.raft_config_file])
|
||||
args.extend(["--coordination_config_file",
|
||||
self.coordination_config_file.name])
|
||||
|
||||
# Each worker must have a unique durability directory.
|
||||
args.extend(["--durability_directory",
|
||||
self.get_durability_directory(worker_id)])
|
||||
return args
|
||||
|
||||
|
||||
def _generate_json_coordination_config(self):
|
||||
data = []
|
||||
for i in range(self.cluster_size):
|
||||
data.append([i + 1, "127.0.0.1", 10000 + i])
|
||||
return json.dumps(data)
|
||||
|
6
tests/integration/ha/log_compaction/raft.json
Normal file
6
tests/integration/ha/log_compaction/raft.json
Normal file
@ -0,0 +1,6 @@
|
||||
{
|
||||
"election_timeout_min": 200,
|
||||
"election_timeout_max": 500,
|
||||
"heartbeat_interval": 100,
|
||||
"log_size_snapshot_threshold": 100
|
||||
}
|
101
tests/integration/ha/log_compaction/runner.py
Executable file
101
tests/integration/ha/log_compaction/runner.py
Executable file
@ -0,0 +1,101 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
import argparse
|
||||
import hashlib
|
||||
import os
|
||||
import time
|
||||
import subprocess
|
||||
import sys
|
||||
import random
|
||||
|
||||
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
||||
PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", "..", ".."))
|
||||
|
||||
# append parent directory
|
||||
sys.path.append(os.path.join(SCRIPT_DIR, ".."))
|
||||
|
||||
from ha_test import HaTestBase
|
||||
|
||||
|
||||
def shasum(filename):
|
||||
with open(filename, 'rb') as f:
|
||||
return hashlib.sha1(f.read()).hexdigest()
|
||||
|
||||
raise Exception("Couldn't get shasum for file {}".format(filename))
|
||||
|
||||
|
||||
class HaLogCompactionTest(HaTestBase):
|
||||
def execute_step(self, query):
|
||||
client = subprocess.Popen(
|
||||
[self.tester_binary, "--cluster_size", str(self.cluster_size)],
|
||||
stdin=subprocess.PIPE)
|
||||
|
||||
try:
|
||||
client.communicate(input=bytes(query, "UTF-8"), timeout=30)
|
||||
except subprocess.TimeoutExpired as e:
|
||||
client.kill()
|
||||
client.communicate()
|
||||
return 1
|
||||
|
||||
return client.returncode
|
||||
|
||||
|
||||
def get_snapshot_path(self, worker_id):
|
||||
dur = os.path.join(self.get_durability_directory(worker_id), "snapshots")
|
||||
snapshots = os.listdir(dur)
|
||||
|
||||
assert len(snapshots) == 1, \
|
||||
"More than one snapshot on worker {}!".format(worker_id)
|
||||
return os.path.join(dur, snapshots[0])
|
||||
|
||||
|
||||
def execute(self):
|
||||
# custom cluster startup
|
||||
for worker_id in range(1, self.cluster_size):
|
||||
self.start_worker(worker_id)
|
||||
|
||||
time.sleep(5)
|
||||
self.execute_step("CREATE (:Node)\n" * 128)
|
||||
|
||||
self.start_worker(0)
|
||||
|
||||
# allow some time for the snapshot transfer
|
||||
time.sleep(5)
|
||||
snapshot_shasum = shasum(self.get_snapshot_path(0))
|
||||
|
||||
success = False
|
||||
for worker_id in range(1, self.cluster_size):
|
||||
if shasum(self.get_snapshot_path(worker_id)) == snapshot_shasum:
|
||||
success = True
|
||||
break
|
||||
|
||||
assert success, "Snapshot didn't transfer successfully"
|
||||
|
||||
|
||||
def find_correct_path(path):
|
||||
f = os.path.join(PROJECT_DIR, "build", path)
|
||||
if not os.path.exists(f):
|
||||
f = os.path.join(PROJECT_DIR, "build_debug", path)
|
||||
return f
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
memgraph_binary = find_correct_path("memgraph_ha")
|
||||
tester_binary = find_correct_path(os.path.join("tests", "manual",
|
||||
"ha_client"))
|
||||
|
||||
raft_config_file = os.path.join(PROJECT_DIR, "tests", "integration", "ha",
|
||||
"log_compaction", "raft.json")
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--memgraph", default=memgraph_binary)
|
||||
parser.add_argument("--raft_config_file", default=raft_config_file)
|
||||
args = parser.parse_args()
|
||||
|
||||
for cluster_size in [3, 5]:
|
||||
print("\033[1;36m~~ Executing test with cluster size: %d~~\033[0m" % (cluster_size))
|
||||
HaLogCompactionTest(
|
||||
args.memgraph, tester_binary, args.raft_config_file, cluster_size)
|
||||
print("\033[1;32m~~ The test finished successfully ~~\033[0m")
|
||||
|
||||
sys.exit(0)
|
@ -1,5 +0,0 @@
|
||||
[
|
||||
[1, "127.0.0.1", 10000],
|
||||
[2, "127.0.0.1", 10001],
|
||||
[3, "127.0.0.1", 10002]
|
||||
]
|
@ -1,201 +0,0 @@
|
||||
#!/usr/bin/python3
|
||||
import argparse
|
||||
import atexit
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import tempfile
|
||||
import time
|
||||
import sys
|
||||
import random
|
||||
|
||||
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
||||
PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", ".."))
|
||||
|
||||
workers = []
|
||||
|
||||
|
||||
def cleanup():
|
||||
for worker in workers:
|
||||
worker.kill()
|
||||
worker.wait()
|
||||
workers.clear()
|
||||
|
||||
|
||||
def wait_for_server(port, delay=0.1):
|
||||
cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)]
|
||||
while subprocess.call(cmd) != 0:
|
||||
time.sleep(0.01)
|
||||
time.sleep(delay)
|
||||
|
||||
|
||||
def generate_args(memgraph_binary, temporary_dir, worker_id, raft_config_file,
|
||||
coordination_config_file):
|
||||
args = [memgraph_binary]
|
||||
args.extend(["--server_id", str(worker_id + 1), "--port", str(7687 + worker_id)])
|
||||
args.extend(["--raft_config_file", raft_config_file])
|
||||
args.extend(["--coordination_config_file", coordination_config_file])
|
||||
|
||||
# Each worker must have a unique durability directory.
|
||||
args.extend(["--durability_directory",
|
||||
os.path.join(temporary_dir, "worker" + str(worker_id))])
|
||||
return args
|
||||
|
||||
|
||||
def execute_step(tester_binary, cluster_size, expected_results, step):
|
||||
if step == "create":
|
||||
client = subprocess.Popen([tester_binary, "--step", "create",
|
||||
"--cluster_size", str(cluster_size)])
|
||||
|
||||
elif step == "count":
|
||||
client = subprocess.Popen([tester_binary, "--step", "count",
|
||||
"--cluster_size", str(cluster_size), "--expected_results",
|
||||
str(expected_results)])
|
||||
else:
|
||||
return 0
|
||||
|
||||
# Check what happened with query execution.
|
||||
try:
|
||||
code = client.wait(timeout=30)
|
||||
except subprocess.TimeoutExpired as e:
|
||||
client.kill()
|
||||
raise e
|
||||
|
||||
return code
|
||||
|
||||
|
||||
def execute_test(memgraph_binary, tester_binary, raft_config_file,
|
||||
coordination_config_file, cluster_size):
|
||||
|
||||
print("\033[1;36m~~ Executing test with cluster size: %d~~\033[0m" % (cluster_size))
|
||||
global workers
|
||||
|
||||
def kill_worker(worker_id):
|
||||
print("Killing worker %d" % (worker_id + 1))
|
||||
global workers
|
||||
workers[worker_id].kill()
|
||||
workers[worker_id].wait()
|
||||
|
||||
def start_worker(worker_id):
|
||||
print("Starting worker %d" % (worker_id + 1))
|
||||
workers[worker_id] = subprocess.Popen(generate_args(memgraph_binary,
|
||||
tempdir.name, worker_id, raft_config_file,
|
||||
coordination_config_file))
|
||||
time.sleep(0.2)
|
||||
assert workers[worker_id].poll() is None, \
|
||||
"Worker{} process died prematurely!".format(worker_id)
|
||||
wait_for_server(7687 + worker_id)
|
||||
|
||||
# Get a temporary directory used for durability.
|
||||
tempdir = tempfile.TemporaryDirectory()
|
||||
|
||||
# Start the cluster.
|
||||
cleanup()
|
||||
for worker_id in range(cluster_size):
|
||||
workers.append(subprocess.Popen(generate_args(memgraph_binary,
|
||||
tempdir.name, worker_id, raft_config_file,
|
||||
coordination_config_file)))
|
||||
|
||||
time.sleep(0.2)
|
||||
assert workers[worker_id].poll() is None, \
|
||||
"Worker{} process died prematurely!".format(worker_id)
|
||||
|
||||
# Wait for the cluster to start.
|
||||
for worker_id in range(cluster_size):
|
||||
wait_for_server(7687 + worker_id)
|
||||
|
||||
time.sleep(1)
|
||||
expected_results = 0
|
||||
|
||||
# Array of exit codes.
|
||||
codes = []
|
||||
|
||||
code = execute_step(tester_binary, cluster_size, expected_results, "create")
|
||||
if code == 0:
|
||||
expected_results += 1
|
||||
else:
|
||||
print("The client process didn't exit cleanly (code %d)!" % code)
|
||||
codes.append(code)
|
||||
|
||||
for i in range(2 * cluster_size):
|
||||
partition = random.sample(range(cluster_size), int((cluster_size - 1) / 2))
|
||||
# kill workers
|
||||
for worker_id in partition:
|
||||
kill_worker(worker_id)
|
||||
|
||||
time.sleep(2) # allow some time for possible leader re-election
|
||||
|
||||
if random.random() < 0.5:
|
||||
print("Executing Create query")
|
||||
code = execute_step(tester_binary, cluster_size, expected_results,
|
||||
"create")
|
||||
if code == 0:
|
||||
expected_results += 1
|
||||
else:
|
||||
print("The client process didn't exit cleanly (code %d)!" % code)
|
||||
codes.append(code)
|
||||
break
|
||||
else:
|
||||
print("Executing Count query")
|
||||
code = execute_step(tester_binary, cluster_size, expected_results,
|
||||
"count")
|
||||
if code != 0:
|
||||
print("The client process didn't exit cleanly (code %d)!" % code)
|
||||
codes.append(code)
|
||||
break
|
||||
|
||||
# Bring it back to life.
|
||||
for worker_id in partition:
|
||||
start_worker(worker_id)
|
||||
|
||||
code = execute_step(tester_binary, cluster_size, expected_results, "count")
|
||||
if code != 0:
|
||||
print("The client process didn't exit cleanly (code %d)!" % code)
|
||||
codes.append(code)
|
||||
|
||||
# Terminate all workers.
|
||||
cleanup()
|
||||
assert not any(codes), "Something went wrong!"
|
||||
|
||||
|
||||
def find_correct_path(path):
|
||||
f = os.path.join(PROJECT_DIR, "build", path)
|
||||
if not os.path.exists(f):
|
||||
f = os.path.join(PROJECT_DIR, "build_debug", path)
|
||||
return f
|
||||
|
||||
|
||||
def generate_json_coordination_config(cluster_size):
|
||||
data = []
|
||||
for i in range(cluster_size):
|
||||
data.append([i + 1, "127.0.0.1", 10000 + i])
|
||||
return json.dumps(data)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
memgraph_binary = find_correct_path("memgraph_ha")
|
||||
tester_binary = find_correct_path(os.path.join("tests", "integration",
|
||||
"ha_basic", "tester"))
|
||||
|
||||
raft_config_file = os.path.join(PROJECT_DIR, "tests", "integration",
|
||||
"ha_basic", "raft.json")
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--memgraph", default=memgraph_binary)
|
||||
parser.add_argument("--tester", default=tester_binary)
|
||||
parser.add_argument("--raft_config_file", default=raft_config_file)
|
||||
args = parser.parse_args()
|
||||
|
||||
for cluster_size in [3, 5]:
|
||||
tmpfile = tempfile.NamedTemporaryFile()
|
||||
coordination = generate_json_coordination_config(cluster_size)
|
||||
tmpfile.write(bytes(coordination, "UTF-8"))
|
||||
tmpfile.flush()
|
||||
|
||||
execute_test(args.memgraph, args.tester, args.raft_config_file,
|
||||
tmpfile.name, cluster_size)
|
||||
|
||||
tmpfile.close()
|
||||
|
||||
print("\033[1;32m~~ The test finished successfully ~~\033[0m")
|
||||
sys.exit(0)
|
Loading…
Reference in New Issue
Block a user