Add automated test for Raft

Summary:
Created a new integration test for Raft protocol.

The tests iterates through the Raft cluster and does the following:
* kill machine `X`
* execute a query
* bring `X` back to life

The first step is to insert a vertex in the cluster, and last step is to check
if the cluster has all the data.

I also edited some of the raft core files because this test surafaced some bugs.

The `tester` binary is a hacked version of the HA client and so are the parts in
the code that refuse to execute a query is the machine is not in `Leader` mode.o
Those parts will go away once we have a proper HA client.

I've run the `runner.py` for a while (215 times)
```
while ./runner.py &> log.txt; do echo -n "."; done
```
and it didn't break.

Reviewers: ipaljak, mferencevic

Reviewed By: ipaljak

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1788
This commit is contained in:
Matija Santl 2019-01-04 16:07:57 +01:00
parent 664622f68e
commit f53913e053
13 changed files with 378 additions and 71 deletions

View File

@ -561,6 +561,13 @@ Interpreter::Results Interpreter::operator()(
const std::string &query_string, database::GraphDbAccessor &db_accessor,
const std::map<std::string, PropertyValue> &params,
bool in_explicit_transaction) {
#ifdef MG_SINGLE_NODE_HA
if (!db_accessor.raft()->IsLeader()) {
throw QueryException(
"Memgraph High Availability: Can't execute queries if not leader.");
}
#endif
utils::Timer frontend_timer;
// Strip the input query.

View File

@ -55,15 +55,13 @@ class MissingPersistentDataException : public RaftException {
};
/// This exception should be thrown when a `RaftServer` instance attempts to
/// read from replication log from a invalid mode or for a garbage collected
/// transaction.
/// read from replication log for a garbage collected transaction or a
/// transaction that didn't begin.
class InvalidReplicationLogLookup : public RaftException {
public:
using RaftException::RaftException;
InvalidReplicationLogLookup()
: RaftException(
"Replication log lookup for invalid transaction or from invalid "
"mode.") {}
: RaftException("Replication log lookup for invalid transaction.") {}
};
} // namespace raft

View File

@ -18,6 +18,9 @@ class RaftInterface {
/// committed in local storage.
virtual bool SafeToCommit(const tx::TransactionId &tx_id) = 0;
/// Returns true if the current servers mode is LEADER. False otherwise.
virtual bool IsLeader() = 0;
protected:
~RaftInterface() {}
};

View File

@ -118,6 +118,12 @@ void RaftServer::Start() {
if (mode_ != Mode::FOLLOWER) Transition(Mode::FOLLOWER);
}
// [Raft thesis 3.4]
// A server remains in follower state as long as it receives valid RPCs from
// a leader or candidate.
SetNextElectionTimePoint();
election_change_.notify_all();
// [Raft paper 5.3]
// "Once a follower learns that a log entry is committed, it applies
// the entry to its state machine (in log order)
@ -245,6 +251,11 @@ void RaftServer::Emplace(const database::StateDelta &delta) {
bool RaftServer::SafeToCommit(const tx::TransactionId &tx_id) {
switch (mode_) {
case Mode::CANDIDATE:
// When Memgraph first starts, the Raft is initialized in candidate
// mode and we try to perform recovery. Since everything for recovery
// needs to be able to commit, we return true.
return true;
case Mode::FOLLOWER:
// When in follower mode, we will only try to apply a Raft Log when we
// receive a commit index greater or equal from the Log index from the
@ -257,19 +268,18 @@ bool RaftServer::SafeToCommit(const tx::TransactionId &tx_id) {
if (rlog_->is_active(tx_id)) return false;
if (rlog_->is_replicated(tx_id)) return true;
// The only possibility left is that our ReplicationLog doesn't contain
// information about that tx. Let's log that on our way out.
break;
case Mode::CANDIDATE:
// information about that tx.
throw InvalidReplicationLogLookup();
break;
}
throw InvalidReplicationLogLookup();
}
void RaftServer::GarbageCollectReplicationLog(const tx::TransactionId &tx_id) {
rlog_->garbage_collect_older(tx_id);
}
bool RaftServer::IsLeader() { return mode_ == Mode::LEADER; }
RaftServer::LogEntryBuffer::LogEntryBuffer(RaftServer *raft_server)
: raft_server_(raft_server) {
CHECK(raft_server_) << "RaftServer can't be nullptr";
@ -300,14 +310,6 @@ void RaftServer::LogEntryBuffer::Emplace(const database::StateDelta &delta) {
log.emplace_back(std::move(delta));
logs_.erase(it);
raft_server_->AppendToLog(tx_id, log);
// Make sure that this wasn't a read query (contains transaction begin and
// commit).
if (log.size() == 2) {
DCHECK(log[0].type == database::StateDelta::Type::TRANSACTION_BEGIN)
<< "Raft log of size two doesn't start with TRANSACTION_BEGIN";
return;
}
} else if (delta.type == database::StateDelta::Type::TRANSACTION_ABORT) {
auto it = logs_.find(tx_id);
CHECK(it != logs_.end()) << "Missing StateDeltas for transaction " << tx_id;
@ -320,22 +322,29 @@ void RaftServer::LogEntryBuffer::Emplace(const database::StateDelta &delta) {
void RaftServer::Transition(const Mode &new_mode) {
switch (new_mode) {
case Mode::FOLLOWER: {
if (mode_ == Mode::LEADER) {
VLOG(40) << "Server " << server_id_
<< ": Transition to FOLLOWER (Term: " << CurrentTerm() << ")";
bool reset = mode_ == Mode::LEADER;
mode_ = Mode::FOLLOWER;
log_entry_buffer_.Disable();
if (reset) {
// Temporaray freeze election timer while we do the reset.
next_election_ = TimePoint::max();
reset_callback_();
ResetReplicationLog();
}
LOG(INFO) << "Server " << server_id_
<< ": Transition to FOLLOWER (Term: " << CurrentTerm() << ")";
mode_ = Mode::FOLLOWER;
log_entry_buffer_.Disable();
SetNextElectionTimePoint();
election_change_.notify_all();
break;
}
case Mode::CANDIDATE: {
LOG(INFO) << "Server " << server_id_
<< ": Transition to CANDIDATE (Term: " << CurrentTerm() << ")";
VLOG(40) << "Server " << server_id_
<< ": Transition to CANDIDATE (Term: " << CurrentTerm() << ")";
log_entry_buffer_.Disable();
// [Raft thesis, section 3.4]
@ -369,8 +378,8 @@ void RaftServer::Transition(const Mode &new_mode) {
}
case Mode::LEADER: {
LOG(INFO) << "Server " << server_id_
<< ": Transition to LEADER (Term: " << CurrentTerm() << ")";
VLOG(40) << "Server " << server_id_
<< ": Transition to LEADER (Term: " << CurrentTerm() << ")";
// Freeze election timer
next_election_ = TimePoint::max();
election_change_.notify_all();
@ -436,13 +445,14 @@ void RaftServer::AdvanceCommitIndex() {
// in this way, then all prior entries are committed indirectly because of the
// Log Matching Property."
if (Log()[new_commit_index].term != CurrentTerm()) {
LOG(INFO) << "Server " << server_id_ << ": cannot commit log entry from "
"previous term based on "
"replication count.";
VLOG(40) << "Server " << server_id_
<< ": cannot commit log entry from "
"previous term based on "
"replication count.";
return;
}
LOG(INFO) << "Begin noting comimitted transactions";
VLOG(40) << "Begin noting comimitted transactions";
// Note the newly committed transactions in ReplicationLog
std::set<tx::TransactionId> replicated_tx_ids;
@ -453,8 +463,7 @@ void RaftServer::AdvanceCommitIndex() {
}
}
for (const auto &tx_id : replicated_tx_ids)
rlog_->set_replicated(tx_id);
for (const auto &tx_id : replicated_tx_ids) rlog_->set_replicated(tx_id);
commit_index_ = new_commit_index;
}
@ -477,10 +486,10 @@ void RaftServer::SendEntries(uint16_t peer_id,
auto peer_future = coordination_->ExecuteOnWorker<AppendEntriesRes>(
peer_id, [&](int worker_id, auto &client) {
try {
auto res = client.template Call<AppendEntriesRpc>(
server_id_, commit_index_, request_term, request_prev_log_index,
request_prev_log_term, request_entries);
return res;
auto res = client.template Call<AppendEntriesRpc>(
server_id_, commit_index_, request_term, request_prev_log_index,
request_prev_log_term, request_entries);
return res;
} catch (...) {
// not being able to connect to peer means we need to retry.
// TODO(ipaljak): Consider backoff.
@ -489,9 +498,9 @@ void RaftServer::SendEntries(uint16_t peer_id,
}
});
LOG(INFO) << "Entries size: " << request_entries.size();
VLOG(40) << "Entries size: " << request_entries.size();
lock.unlock(); // Release lock while waiting for response.
lock.unlock(); // Release lock while waiting for response.
auto reply = peer_future.get();
lock.lock();
@ -513,8 +522,8 @@ void RaftServer::SendEntries(uint16_t peer_id,
<< "Elected leader for term should never change.";
if (reply.term != CurrentTerm()) {
LOG(INFO) << "Server " << server_id_
<< ": Ignoring stale AppendEntriesRPC reply from " << peer_id;
VLOG(40) << "Server " << server_id_
<< ": Ignoring stale AppendEntriesRPC reply from " << peer_id;
return;
}
@ -539,9 +548,8 @@ void RaftServer::ElectionThreadMain() {
std::unique_lock<std::mutex> lock(lock_);
while (!exiting_) {
if (Clock::now() >= next_election_) {
LOG(INFO) << "Server " << server_id_
<< ": Election timeout exceeded (Term: " << CurrentTerm()
<< ")";
VLOG(40) << "Server " << server_id_
<< ": Election timeout exceeded (Term: " << CurrentTerm() << ")";
Transition(Mode::CANDIDATE);
state_changed_.notify_all();
}
@ -603,8 +611,8 @@ void RaftServer::PeerThreadMain(uint16_t peer_id) {
if (CurrentTerm() != request_term || mode_ != Mode::CANDIDATE ||
exiting_) {
LOG(INFO) << "Server " << server_id_
<< ": Ignoring RequestVoteRPC reply from " << peer_id;
VLOG(40) << "Server " << server_id_
<< ": Ignoring RequestVoteRPC reply from " << peer_id;
break;
}
@ -616,13 +624,13 @@ void RaftServer::PeerThreadMain(uint16_t peer_id) {
vote_requested_[peer_id] = true;
if (reply.vote_granted) {
LOG(INFO) << "Server " << server_id_ << ": Got vote from "
<< peer_id;
VLOG(40) << "Server " << server_id_ << ": Got vote from "
<< peer_id;
++granted_votes_;
if (HasMajortyVote()) Transition(Mode::LEADER);
} else {
LOG(INFO) << "Server " << server_id_ << ": Denied vote from "
<< peer_id;
VLOG(40) << "Server " << server_id_ << ": Denied vote from "
<< peer_id;
}
state_changed_.notify_all();
@ -631,9 +639,9 @@ void RaftServer::PeerThreadMain(uint16_t peer_id) {
case Mode::LEADER: {
if (now >= next_heartbeat_[peer_id]) {
LOG(INFO) << "Server " << server_id_
<< ": Send AppendEntries RPC to server " << peer_id
<< " (Term: " << CurrentTerm() << ")";
VLOG(40) << "Server " << server_id_
<< ": Send AppendEntries RPC to server " << peer_id
<< " (Term: " << CurrentTerm() << ")";
SendEntries(peer_id, lock);
continue;
}
@ -655,8 +663,7 @@ void RaftServer::NoOpIssuerThreadMain() {
// no_op_create_callback_ will create a new transaction that has a NO_OP
// StateDelta. This will trigger the whole procedure of replicating logs
// in our implementation of Raft.
if (!exiting_)
no_op_create_callback_();
if (!exiting_) no_op_create_callback_();
}
}
@ -675,8 +682,8 @@ void RaftServer::SetNextElectionTimePoint() {
bool RaftServer::HasMajortyVote() {
if (2 * granted_votes_ > coordination_->WorkerCount()) {
LOG(INFO) << "Server " << server_id_
<< ": Obtained majority vote (Term: " << CurrentTerm() << ")";
VLOG(40) << "Server " << server_id_
<< ": Obtained majority vote (Term: " << CurrentTerm() << ")";
return true;
}
return false;
@ -740,8 +747,7 @@ void RaftServer::AppendLogEntries(uint64_t leader_commit_index,
log[current_index].term != new_entries[i].term)
DeleteLogSuffix(current_index);
DCHECK(log.size() >= current_index);
if (log.size() == current_index)
log.emplace_back(new_entries[i]);
if (log.size() == current_index) log.emplace_back(new_entries[i]);
}
// See Raft paper 5.3

View File

@ -95,6 +95,9 @@ class RaftServer final : public RaftInterface {
/// committed in local storage.
bool SafeToCommit(const tx::TransactionId &tx_id) override;
/// Returns true if the current servers mode is LEADER. False otherwise.
bool IsLeader() override;
void GarbageCollectReplicationLog(const tx::TransactionId &tx_id);
private:
@ -145,10 +148,10 @@ class RaftServer final : public RaftInterface {
database::StateDeltaApplier *delta_applier_{nullptr};
std::unique_ptr<ReplicationLog> rlog_{nullptr};
std::atomic<Mode> mode_; ///< Server's current mode.
uint16_t server_id_; ///< ID of the current server.
uint64_t commit_index_; ///< Index of the highest known committed entry.
uint64_t last_applied_; ///< Index of the highest applied entry to SM.
std::atomic<Mode> mode_; ///< Server's current mode.
uint16_t server_id_; ///< ID of the current server.
uint64_t commit_index_; ///< Index of the highest known committed entry.
uint64_t last_applied_; ///< Index of the highest applied entry to SM.
/// Raft log entry buffer.
///
@ -163,12 +166,12 @@ class RaftServer final : public RaftInterface {
std::condition_variable state_changed_; ///< Notifies all peer threads on
///< relevant state change.
std::thread no_op_issuer_thread_; ///< Thread responsible for issuing no-op
///< command on leader change.
std::thread no_op_issuer_thread_; ///< Thread responsible for issuing no-op
///< command on leader change.
std::condition_variable leader_changed_; ///< Notifies the no_op_issuer_thread
///< that a new leader has been
///< elected.
std::condition_variable leader_changed_; ///< Notifies the
///< no_op_issuer_thread that a new
///< leader has been elected.
bool exiting_ = false; ///< True on server shutdown.

View File

@ -15,3 +15,6 @@ add_subdirectory(auth)
# distributed test binaries
add_subdirectory(distributed)
# raft test binaries
add_subdirectory(ha_basic)

View File

@ -50,3 +50,13 @@
- runner.py # runner script
- ../../../build_debug/memgraph_distributed # memgraph distributed binary
- ../../../build_debug/tests/integration/distributed/tester # tester binary
- name: integration__ha_basic
cd: ha_basic
commands: ./runner.py
infiles:
- runner.py # runner script
- raft.json # raft configuration
- coordination.json # coordination configuration
- ../../../build_debug/memgraph_ha # memgraph distributed binary
- ../../../build_debug/tests/integration/ha_basic/tester # tester binary

View File

@ -0,0 +1,6 @@
set(target_name memgraph__integration__ha_basic)
set(tester_target_name ${target_name}__tester)
add_executable(${tester_target_name} tester.cpp)
set_target_properties(${tester_target_name} PROPERTIES OUTPUT_NAME tester)
target_link_libraries(${tester_target_name} mg-utils mg-communication)

View File

@ -0,0 +1,5 @@
[
[1, "127.0.0.1", 10000],
[2, "127.0.0.1", 10001],
[3, "127.0.0.1", 10002]
]

View File

@ -0,0 +1,6 @@
{
"election_timeout_min": 200,
"election_timeout_max": 500,
"heartbeat_interval": 100,
"replicate_timeout": 100
}

View File

@ -0,0 +1,181 @@
#!/usr/bin/python3
import argparse
import atexit
import json
import os
import subprocess
import tempfile
import time
import sys
import random
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
PROJECT_DIR = os.path.normpath(os.path.join(SCRIPT_DIR, "..", "..", ".."))
workers = []
def cleanup():
for worker in workers:
worker.kill()
worker.wait()
workers.clear()
def wait_for_server(port, delay=0.1):
cmd = ["nc", "-z", "-w", "1", "127.0.0.1", str(port)]
while subprocess.call(cmd) != 0:
time.sleep(0.01)
time.sleep(delay)
def generate_args(memgraph_binary, temporary_dir, worker_id, raft_config_file,
coordination_config_file):
args = [memgraph_binary]
args.extend(["--server_id", str(worker_id + 1), "--port", str(7687 + worker_id)])
args.extend(["--raft_config_file", raft_config_file])
args.extend(["--coordination_config_file", coordination_config_file])
# Each worker must have a unique durability directory.
args.extend(["--durability_directory",
os.path.join(temporary_dir, "worker" + str(worker_id))])
return args
def execute_step(tester_binary, cluster_size, expected_results, step):
if step == "create":
client = subprocess.Popen([tester_binary, "--step", "create",
"--cluster_size", str(cluster_size)])
elif step == "count":
client = subprocess.Popen([tester_binary, "--step", "count",
"--cluster_size", str(cluster_size), "--expected_results",
str(expected_results)])
else:
return 0
# Check what happened with query execution.
try:
code = client.wait(timeout=30)
except subprocess.TimeoutExpired as e:
client.kill()
raise e
return code
def execute_test(memgraph_binary, tester_binary, raft_config_file,
coordination_config_file, cluster_size):
print("\033[1;36m~~ Executing test with cluster size: %d~~\033[0m" % (cluster_size))
# Get a temporary directory used for durability.
tempdir = tempfile.TemporaryDirectory()
# Start the cluster.
cleanup()
for worker_id in range(cluster_size):
workers.append(subprocess.Popen(generate_args(memgraph_binary,
tempdir.name, worker_id, raft_config_file,
coordination_config_file)))
time.sleep(0.2)
assert workers[worker_id].poll() is None, \
"Worker{} process died prematurely!".format(worker_id)
# Wait for the cluster to start.
for worker_id in range(cluster_size):
wait_for_server(7687 + worker_id)
time.sleep(1)
expected_results = 0
# Array of exit codes.
codes = []
code = execute_step(tester_binary, cluster_size, expected_results, "create")
if code == 0:
expected_results += 1
else:
print("The client process didn't exit cleanly (code %d)!" % code)
codes.append(code)
for i in range(2 * cluster_size):
worker_id = i % cluster_size
workers[worker_id].kill()
workers[worker_id].wait()
print("Killing worker %d" % (worker_id + 1))
time.sleep(2) # allow for possible leader re-election
if random.random() < 0.5:
print("Executing Create query")
code = execute_step(tester_binary, cluster_size, expected_results,
"create")
if code == 0:
expected_results += 1
else:
print("The client process didn't exit cleanly (code %d)!" % code)
codes.append(code)
break
else:
print("Executing Count query")
code = execute_step(tester_binary, cluster_size, expected_results,
"count")
if code != 0:
print("The client process didn't exit cleanly (code %d)!" % code)
codes.append(code)
break
# Bring it back to life.
print("Starting worker %d" % (worker_id + 1))
workers[worker_id] = subprocess.Popen(generate_args(memgraph_binary,
tempdir.name, worker_id, raft_config_file,
coordination_config_file))
time.sleep(0.2)
assert workers[worker_id].poll() is None, \
"Worker{} process died prematurely!".format(worker_id)
wait_for_server(7687 + worker_id)
code = execute_step(tester_binary, cluster_size, expected_results, "count")
if code != 0:
print("The client process didn't exit cleanly (code %d)!" % code)
codes.append(code)
# Terminate all workers.
cleanup()
assert not any(codes), "Something went wrong!"
def find_correct_path(path):
f = os.path.join(PROJECT_DIR, "build", path)
if not os.path.exists(f):
f = os.path.join(PROJECT_DIR, "build_debug", path)
return f
if __name__ == "__main__":
memgraph_binary = find_correct_path("memgraph_ha")
tester_binary = find_correct_path(os.path.join("tests", "integration",
"ha_basic", "tester"))
raft_config_file = os.path.join(PROJECT_DIR, "tests", "integration",
"ha_basic", "raft.json")
coordination_config_file = os.path.join(PROJECT_DIR, "tests", "integration",
"ha_basic", "coordination.json")
parser = argparse.ArgumentParser()
parser.add_argument("--memgraph", default=memgraph_binary)
parser.add_argument("--tester", default=tester_binary)
parser.add_argument("--raft_config_file", default=raft_config_file)
parser.add_argument("--coordination_config_file",
default=coordination_config_file)
args = parser.parse_args()
execute_test(args.memgraph, args.tester,
args.raft_config_file,
args.coordination_config_file, 3)
print("\033[1;32m~~ The test finished successfully ~~\033[0m")
sys.exit(0)

View File

@ -0,0 +1,79 @@
#include <chrono>
#include <thread>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "communication/bolt/client.hpp"
#include "io/network/endpoint.hpp"
#include "io/network/utils.hpp"
#include "utils/timer.hpp"
DEFINE_string(address, "127.0.0.1", "Server address");
DEFINE_int32(port, 7687, "Server port");
DEFINE_int32(cluster_size, 3, "Size of the raft cluster.");
DEFINE_int32(expected_results, -1, "Number of expected nodes.");
DEFINE_string(username, "", "Username for the database");
DEFINE_string(password, "", "Password for the database");
DEFINE_bool(use_ssl, false, "Set to true to connect with SSL to the server.");
DEFINE_string(step, "", "The step to execute (available: create, count)");
using namespace std::chrono_literals;
int main(int argc, char **argv) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
google::InitGoogleLogging(argv[0]);
communication::Init();
bool successfull = false;
for (int retry = 0; !successfull && retry < 10; ++retry) {
for (int i = 0; !successfull && i < FLAGS_cluster_size; ++i) {
try {
communication::ClientContext context(FLAGS_use_ssl);
communication::bolt::Client client(&context);
uint16_t port = FLAGS_port + i;
io::network::Endpoint endpoint{FLAGS_address, port};
client.Connect(endpoint, FLAGS_username, FLAGS_password);
if (FLAGS_step == "create") {
client.Execute("create (:Node)", {});
successfull = true;
} else if (FLAGS_step == "count") {
auto result = client.Execute("match (n) return n", {});
if (result.records.size() != FLAGS_expected_results) {
LOG(WARNING) << "Missing data: expected " << FLAGS_expected_results
<< ", got " << result.records.size();
return 2;
}
successfull = true;
} else {
LOG(FATAL) << "Unexpected client step!";
}
} catch (const communication::bolt::ClientQueryException &) {
// This one is not the leader, continue.
continue;
} catch (const communication::bolt::ClientFatalException &) {
// This one seems to be down, continue.
continue;
}
}
if (!successfull) {
LOG(INFO) << "Couldn't find Raft cluster leader, retrying.";
std::this_thread::sleep_for(1s);
}
}
if (!successfull) {
LOG(WARNING) << "Couldn't find Raft cluster leader.";
return 1;
}
return 0;
}

View File

@ -16,9 +16,9 @@ class RaftMock final : public raft::RaftInterface {
log_[delta.transaction_id].emplace_back(std::move(delta));
}
bool SafeToCommit(const tx::TransactionId &tx_id) override {
return true;
}
bool SafeToCommit(const tx::TransactionId &tx_id) override { return true; }
bool IsLeader() override { return true; }
std::vector<database::StateDelta> GetLogForTx(
const tx::TransactionId &tx_id) {