Add an integration test which replicates large log entries
Summary: At the moment, this test will fail. There are currently two issues with it: - Cap'n Proto limit exception might occur - `db.Reset()` method can hang (deadlock) The first issue should be resolved by migration to SLK and we are currently working on the second one. The test will land after those are fixed. Reviewers: msantl, mferencevic Reviewed By: msantl Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2006
This commit is contained in:
parent
d0f05c6cc5
commit
d7acf75a78
@ -22,6 +22,9 @@ add_subdirectory(ha/basic)
|
||||
# distributed ha/index binaries
|
||||
add_subdirectory(ha/index)
|
||||
|
||||
# distributed ha/large_log_entries binaries
|
||||
add_subdirectory(ha/large_log_entries)
|
||||
|
||||
# distributed ha/term_updates binaries
|
||||
add_subdirectory(ha/term_updates)
|
||||
|
||||
|
@ -92,6 +92,16 @@
|
||||
- ../../../../build_debug/memgraph_ha # memgraph ha binary
|
||||
- ../../../../build_debug/tests/integration/ha/index/tester # tester binary
|
||||
|
||||
- name: integration__ha_large_log_entries
|
||||
cd: ha/large_log_entries
|
||||
commands: TIMEOUT=360 ./runner.py
|
||||
infiles:
|
||||
- runner.py # runner script
|
||||
- raft.json # raft configuration
|
||||
- ../ha_test.py # raft test base module
|
||||
- ../../../../build_debug/memgraph_ha # memgraph ha binary
|
||||
- ../../../../build_debug/tests/integration/ha/large_log_entries/tester # tester binary
|
||||
|
||||
- name: integration__ha_log_compaction
|
||||
cd: ha/log_compaction
|
||||
commands: ./runner.py
|
||||
|
6
tests/integration/ha/large_log_entries/CMakeLists.txt
Normal file
6
tests/integration/ha/large_log_entries/CMakeLists.txt
Normal file
@ -0,0 +1,6 @@
|
||||
set(target_name memgraph__integration__ha_large_log_entries)
|
||||
set(tester_target_name ${target_name}__tester)
|
||||
|
||||
add_executable(${tester_target_name} tester.cpp)
|
||||
set_target_properties(${tester_target_name} PROPERTIES OUTPUT_NAME tester)
|
||||
target_link_libraries(${tester_target_name} mg-utils mg-communication)
|
7
tests/integration/ha/large_log_entries/raft.json
Normal file
7
tests/integration/ha/large_log_entries/raft.json
Normal file
@ -0,0 +1,7 @@
|
||||
{
|
||||
"election_timeout_min": 750,
|
||||
"election_timeout_max": 1000,
|
||||
"heartbeat_interval": 100,
|
||||
"replication_timeout": 10000,
|
||||
"log_size_snapshot_threshold": -1
|
||||
}
|
94
tests/integration/ha/large_log_entries/runner.py
Executable file
94
tests/integration/ha/large_log_entries/runner.py
Executable file
@ -0,0 +1,94 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import time
|
||||
import random
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
||||
PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", "..", ".."))
|
||||
|
||||
# append parent directory
|
||||
sys.path.append(os.path.join(SCRIPT_DIR, ".."))
|
||||
|
||||
from ha_test import HaTestBase
|
||||
|
||||
|
||||
class HaLargeLogEntriesTest(HaTestBase):
|
||||
def execute_step(self, step, node_count):
|
||||
if step == "create":
|
||||
print("Executing create query")
|
||||
client = subprocess.Popen([self.tester_binary, "--step", "create",
|
||||
"--cluster_size", str(self.cluster_size),
|
||||
"--create_nodes", str(node_count)])
|
||||
|
||||
elif step == "check":
|
||||
print("Executing check query")
|
||||
client = subprocess.Popen([self.tester_binary, "--step", "check",
|
||||
"--cluster_size", str(self.cluster_size),
|
||||
"--check_nodes", str(node_count)])
|
||||
else:
|
||||
return 0
|
||||
|
||||
# Check what happened with query execution.
|
||||
try:
|
||||
code = client.wait(timeout=120)
|
||||
except subprocess.TimeoutExpired as e:
|
||||
print("HA client timed out!")
|
||||
client.kill()
|
||||
return 1
|
||||
|
||||
return code
|
||||
|
||||
|
||||
def execute(self):
|
||||
# Number of nodes to be created in a single batch
|
||||
nodes = 250000
|
||||
|
||||
self.start_cluster()
|
||||
|
||||
for i in range(self.cluster_size):
|
||||
assert self.execute_step("create", nodes) == 0, \
|
||||
"Error while executing create query"
|
||||
|
||||
# Kill worker.
|
||||
print("Killing worker {}".format(i))
|
||||
self.kill_worker(i)
|
||||
|
||||
assert self.execute_step("check", (i + 1) * nodes) == 0, \
|
||||
"Error while executing check query"
|
||||
|
||||
# Bring worker back to life.
|
||||
print("Starting worker {}".format(i))
|
||||
self.start_worker(i)
|
||||
|
||||
|
||||
def find_correct_path(path):
|
||||
f = os.path.join(PROJECT_DIR, "build", path)
|
||||
if not os.path.exists(f):
|
||||
f = os.path.join(PROJECT_DIR, "build_debug", path)
|
||||
return f
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
memgraph_binary = find_correct_path("memgraph_ha")
|
||||
tester_binary = find_correct_path(os.path.join("tests", "integration", "ha",
|
||||
"large_log_entries", "tester"))
|
||||
|
||||
raft_config_file = os.path.join(PROJECT_DIR, "tests", "integration", "ha",
|
||||
"large_log_entries", "raft.json")
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--memgraph", default=memgraph_binary)
|
||||
parser.add_argument("--raft_config_file", default=raft_config_file)
|
||||
args = parser.parse_args()
|
||||
|
||||
for cluster_size in [3, 5]:
|
||||
print("\033[1;36m~~ Executing test with cluster size: %d~~\033[0m" % (cluster_size))
|
||||
HaLargeLogEntriesTest(
|
||||
args.memgraph, tester_binary, args.raft_config_file, cluster_size)
|
||||
print("\033[1;32m~~ The test finished successfully ~~\033[0m")
|
||||
|
||||
sys.exit(0)
|
71
tests/integration/ha/large_log_entries/tester.cpp
Normal file
71
tests/integration/ha/large_log_entries/tester.cpp
Normal file
@ -0,0 +1,71 @@
|
||||
#include <chrono>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
#include <gflags/gflags.h>
|
||||
#include <glog/logging.h>
|
||||
|
||||
#include "communication/bolt/ha_client.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "io/network/utils.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
|
||||
DEFINE_string(address, "127.0.0.1", "Server address");
|
||||
DEFINE_int32(port, 7687, "Server port");
|
||||
DEFINE_int32(cluster_size, 3, "Size of the raft cluster.");
|
||||
DEFINE_int32(create_nodes, 250000, "Number of nodes to be created.");
|
||||
DEFINE_int32(check_nodes, -1, "Number of nodes that should be in the database");
|
||||
DEFINE_string(username, "", "Username for the database");
|
||||
DEFINE_string(password, "", "Password for the database");
|
||||
DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server.");
|
||||
DEFINE_string(step, "", "The step to execute (available: create, check)");
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
gflags::ParseCommandLineFlags(&argc, &argv, true);
|
||||
google::InitGoogleLogging(argv[0]);
|
||||
|
||||
communication::Init();
|
||||
try {
|
||||
std::vector<io::network::Endpoint> endpoints(FLAGS_cluster_size);
|
||||
for (int i = 0; i < FLAGS_cluster_size; ++i)
|
||||
endpoints[i] = io::network::Endpoint(FLAGS_address, FLAGS_port + i);
|
||||
|
||||
std::chrono::milliseconds retry_delay(1000);
|
||||
communication::ClientContext context(FLAGS_use_ssl);
|
||||
communication::bolt::HAClient client(endpoints, &context, FLAGS_username,
|
||||
FLAGS_password, 20, retry_delay);
|
||||
|
||||
if (FLAGS_step == "create") {
|
||||
client.Execute("UNWIND RANGE(1, " + std::to_string(FLAGS_create_nodes) +
|
||||
") AS x CREATE(:Node {id:x})",
|
||||
{});
|
||||
return 0;
|
||||
} else if (FLAGS_step == "check") {
|
||||
auto result = client.Execute("MATCH (n) RETURN COUNT(n)", {});
|
||||
if (result.records[0][0].ValueInt() != FLAGS_check_nodes) {
|
||||
LOG(WARNING) << "Wrong number of nodes! Got " +
|
||||
std::to_string(result.records[0][0].ValueInt()) +
|
||||
", but expected " +
|
||||
std::to_string(FLAGS_check_nodes);
|
||||
return 2;
|
||||
}
|
||||
return 0;
|
||||
} else {
|
||||
LOG(FATAL) << "Unexpected client step!";
|
||||
}
|
||||
|
||||
} catch (const communication::bolt::ClientQueryException &e) {
|
||||
LOG(WARNING)
|
||||
<< "Transient error while executing query. (eg. mistyped query, etc.)\n"
|
||||
<< e.what();
|
||||
} catch (const communication::bolt::ClientFatalException &e) {
|
||||
LOG(WARNING) << "Couldn't connect to server\n" << e.what();
|
||||
} catch (const utils::BasicException &e) {
|
||||
LOG(WARNING) << "Error while executing query\n" << e.what();
|
||||
}
|
||||
|
||||
// The test wasn't successfull
|
||||
return 1;
|
||||
}
|
Loading…
Reference in New Issue
Block a user