Include HA client in HA basic integration test
Summary: - Included HA client - Fixed log messages to be 1-indexed - Added id properties to created nodes for easier debugging - Create and check steps are now executed 20 times each Reviewers: msantl Reviewed By: msantl Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2111
This commit is contained in:
parent
45413c0612
commit
9d932797ab
@ -1,6 +1,6 @@
|
||||
{
|
||||
"election_timeout_min": 200,
|
||||
"election_timeout_max": 500,
|
||||
"election_timeout_min": 750,
|
||||
"election_timeout_max": 1000,
|
||||
"heartbeat_interval": 100,
|
||||
"replication_timeout": 10000,
|
||||
"log_size_snapshot_threshold": -1
|
||||
|
@ -1,5 +1,19 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
"""
|
||||
This test checks the the basic functionality of HA Memgraph. It incorporates
|
||||
both leader election and log replication processes.
|
||||
|
||||
The test proceeds as follows for clusters of size 3 and 5:
|
||||
1) Start the whole cluster
|
||||
2) Kill random workers but leave the majority alive
|
||||
3) Create a single Node
|
||||
4) Bring dead nodes back to life
|
||||
5) Kill random workers but leave the majority alive
|
||||
6) Check if everything is ok with DB state
|
||||
7) GOTO 1) and repeat 25 times
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import time
|
||||
@ -17,17 +31,19 @@ from ha_test import HaTestBase
|
||||
|
||||
|
||||
class HaBasicTest(HaTestBase):
|
||||
def execute_step(self, step, expected_results):
|
||||
def execute_step(self, step, node_count):
|
||||
if step == "create":
|
||||
print("Executing create query")
|
||||
client = subprocess.Popen([self.tester_binary, "--step", "create",
|
||||
"--cluster_size", str(self.cluster_size)])
|
||||
|
||||
client = subprocess.Popen([self.tester_binary,
|
||||
"--step", "create",
|
||||
"--cluster-size", str(self.cluster_size),
|
||||
"--node-count", str(node_count)])
|
||||
elif step == "count":
|
||||
print("Executing count query")
|
||||
client = subprocess.Popen([self.tester_binary, "--step", "count",
|
||||
"--cluster_size", str(self.cluster_size), "--expected_results",
|
||||
str(expected_results)])
|
||||
client = subprocess.Popen([self.tester_binary,
|
||||
"--step", "count",
|
||||
"--cluster_size", str(self.cluster_size),
|
||||
"--node-count", str(node_count)])
|
||||
else:
|
||||
return 0
|
||||
|
||||
@ -42,6 +58,18 @@ class HaBasicTest(HaTestBase):
|
||||
return code
|
||||
|
||||
|
||||
def start_workers(self, worker_ids):
|
||||
for wid in worker_ids:
|
||||
print("Starting worker {}".format(wid + 1))
|
||||
self.start_worker(wid)
|
||||
|
||||
|
||||
def kill_workers(self, worker_ids):
|
||||
for wid in worker_ids:
|
||||
print("Killing worker {}".format(wid + 1))
|
||||
self.kill_worker(wid)
|
||||
|
||||
|
||||
def execute(self):
|
||||
self.start_cluster()
|
||||
|
||||
@ -52,31 +80,29 @@ class HaBasicTest(HaTestBase):
|
||||
"Error while executing create query"
|
||||
expected_results = 1
|
||||
|
||||
for i in range(2 * self.cluster_size):
|
||||
for i in range(20):
|
||||
# Create step
|
||||
partition = random.sample(range(self.cluster_size),
|
||||
int((self.cluster_size - 1) / 2))
|
||||
random.randint(0, int((self.cluster_size - 1) / 2)))
|
||||
|
||||
# Kill workers.
|
||||
for worker_id in partition:
|
||||
print("Killing worker {}".format(worker_id))
|
||||
self.kill_worker(worker_id)
|
||||
self.kill_workers(partition)
|
||||
|
||||
time.sleep(5) # allow some time for possible leader re-election
|
||||
|
||||
if random.random() < 0.7:
|
||||
assert self.execute_step("create", expected_results) == 0, \
|
||||
"Error while executing create query"
|
||||
expected_results += 1
|
||||
else:
|
||||
|
||||
self.start_workers(partition)
|
||||
|
||||
# Check step
|
||||
partition = random.sample(range(self.cluster_size),
|
||||
random.randint(0, int((self.cluster_size - 1) / 2)))
|
||||
|
||||
self.kill_workers(partition)
|
||||
|
||||
assert self.execute_step("count", expected_results) == 0, \
|
||||
"Error while executing count query"
|
||||
|
||||
# Bring workers back to life.
|
||||
for worker_id in partition:
|
||||
print("Starting worker {}".format(worker_id))
|
||||
self.start_worker(worker_id)
|
||||
|
||||
time.sleep(5) # allow some time for possible leader re-election
|
||||
self.start_workers(partition)
|
||||
|
||||
# Check that no data was lost.
|
||||
assert self.execute_step("count", expected_results) == 0, \
|
||||
|
@ -5,7 +5,7 @@
|
||||
#include <gflags/gflags.h>
|
||||
#include <glog/logging.h>
|
||||
|
||||
#include "communication/bolt/client.hpp"
|
||||
#include "communication/bolt/ha_client.hpp"
|
||||
#include "io/network/endpoint.hpp"
|
||||
#include "io/network/utils.hpp"
|
||||
#include "utils/timer.hpp"
|
||||
@ -13,11 +13,10 @@
|
||||
DEFINE_string(address, "127.0.0.1", "Server address");
|
||||
DEFINE_int32(port, 7687, "Server port");
|
||||
DEFINE_int32(cluster_size, 3, "Size of the raft cluster.");
|
||||
DEFINE_int32(expected_results, -1, "Number of expected nodes.");
|
||||
DEFINE_int32(node_count, -1, "Expected number of nodes in the database.");
|
||||
DEFINE_string(username, "", "Username for the database");
|
||||
DEFINE_string(password, "", "Password for the database");
|
||||
DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server.");
|
||||
|
||||
DEFINE_string(step, "", "The step to execute (available: create, count)");
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
@ -28,53 +27,41 @@ int main(int argc, char **argv) {
|
||||
|
||||
communication::Init();
|
||||
|
||||
bool successful = false;
|
||||
for (int retry = 0; !successful && retry < 10; ++retry) {
|
||||
for (int i = 0; !successful && i < FLAGS_cluster_size; ++i) {
|
||||
try {
|
||||
communication::ClientContext context(FLAGS_use_ssl);
|
||||
communication::bolt::Client client(&context);
|
||||
std::vector<io::network::Endpoint> endpoints(FLAGS_cluster_size);
|
||||
for (int i = 0; i < FLAGS_cluster_size; ++i)
|
||||
endpoints[i] = io::network::Endpoint(FLAGS_address, FLAGS_port + i);
|
||||
|
||||
uint16_t port = FLAGS_port + i;
|
||||
io::network::Endpoint endpoint{FLAGS_address, port};
|
||||
client.Connect(endpoint, FLAGS_username, FLAGS_password);
|
||||
std::chrono::milliseconds retry_delay(1000);
|
||||
communication::ClientContext context(FLAGS_use_ssl);
|
||||
communication::bolt::HAClient client(endpoints, &context, FLAGS_username,
|
||||
FLAGS_password, 25, retry_delay);
|
||||
|
||||
if (FLAGS_step == "create") {
|
||||
client.Execute("create (:Node)", {});
|
||||
successful = true;
|
||||
|
||||
client.Execute("create (:Node {id: $id})",
|
||||
{{"id", FLAGS_node_count + 1}});
|
||||
return 0;
|
||||
} else if (FLAGS_step == "count") {
|
||||
auto result = client.Execute("match (n) return n", {});
|
||||
|
||||
if (result.records.size() != FLAGS_expected_results) {
|
||||
LOG(WARNING) << "Missing data: expected " << FLAGS_expected_results
|
||||
if (result.records.size() != FLAGS_node_count) {
|
||||
LOG(WARNING) << "Missing data: expected " << FLAGS_node_count
|
||||
<< ", got " << result.records.size();
|
||||
return 2;
|
||||
}
|
||||
|
||||
successful = true;
|
||||
|
||||
return 0;
|
||||
} else {
|
||||
LOG(FATAL) << "Unexpected client step!";
|
||||
}
|
||||
} catch (const communication::bolt::ClientQueryException &) {
|
||||
// This one is not the leader, continue.
|
||||
continue;
|
||||
} catch (const communication::bolt::ClientFatalException &) {
|
||||
// This one seems to be down, continue.
|
||||
continue;
|
||||
}
|
||||
LOG(INFO) << "Current Raft cluster leader is " << i;
|
||||
}
|
||||
if (!successful) {
|
||||
LOG(INFO) << "Couldn't find Raft cluster leader, retrying.";
|
||||
std::this_thread::sleep_for(1s);
|
||||
}
|
||||
} catch (const communication::bolt::ClientQueryException &e) {
|
||||
LOG(WARNING)
|
||||
<< "Transient error while executing query. (eg. mistyped query, etc.)\n"
|
||||
<< e.what();
|
||||
} catch (const communication::bolt::ClientFatalException &e) {
|
||||
LOG(WARNING) << "Couldn't connect to server\n" << e.what();
|
||||
} catch (const utils::BasicException &e) {
|
||||
LOG(WARNING) << "Error while executing query\n" << e.what();
|
||||
}
|
||||
|
||||
if (!successful) {
|
||||
LOG(WARNING) << "Couldn't find Raft cluster leader.";
|
||||
// The test wasn't successfull
|
||||
return 1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user