Introduce shutdown mode in Raft

Reviewers: msantl

Reviewed By: msantl

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2019
This commit is contained in:
Ivan Paljak 2019-05-08 13:30:40 +02:00
parent 4d8f8be1ab
commit 08b9197c79
2 changed files with 37 additions and 21 deletions

View File

@ -91,7 +91,7 @@ void RaftServer::Start() {
// [Raft paper 5.1]
// "If a server recieves a request with a stale term,
// it rejects the request"
if (exiting_ || req.term < current_term_) {
if (mode_ == Mode::SHUTDOWN || req.term < current_term_) {
RequestVoteRes res(false, current_term_);
slk::Save(res, res_builder);
return;
@ -130,7 +130,7 @@ void RaftServer::Start() {
// [Raft paper 5.1]
// "If a server receives a request with a stale term, it rejects the
// request"
if (exiting_ || req.term < current_term_) {
if (mode_== Mode::SHUTDOWN || req.term < current_term_) {
AppendEntriesRes res(false, current_term_);
slk::Save(res, res_builder);
return;
@ -219,7 +219,7 @@ void RaftServer::Start() {
HeartbeatReq req;
slk::Load(&req, req_reader);
if (exiting_ || req.term < current_term_) {
if (mode_ == Mode::SHUTDOWN || req.term < current_term_) {
HeartbeatRes res(false, current_term_);
slk::Save(res, res_builder);
return;
@ -246,7 +246,7 @@ void RaftServer::Start() {
InstallSnapshotReq req;
slk::Load(&req, req_reader);
if (exiting_ || req.term < current_term_) {
if (mode_ == Mode::SHUTDOWN || req.term < current_term_) {
InstallSnapshotRes res(current_term_);
slk::Save(res, res_builder);
return;
@ -338,7 +338,7 @@ void RaftServer::Start() {
void RaftServer::Shutdown() {
{
std::lock_guard<std::mutex> guard(lock_);
exiting_ = true;
mode_ = Mode::SHUTDOWN;
state_changed_.notify_all();
election_change_.notify_all();
@ -475,6 +475,14 @@ bool RaftServer::SafeToCommit(const tx::TransactionId &tx_id) {
// information about that tx.
throw InvalidReplicationLogLookup();
break;
case Mode::SHUTDOWN:
// TODO(msantl): This is a bug. If we were the Raft leader before the
// shutdown was initiated, it is possible that our bolt client threads
// still wait for the log to be replicated. This shouldn't be the case
// because the cluster might have already started leader re-election, and
// we should inform the client about the situation.
return true;
break;
}
}
@ -482,7 +490,7 @@ void RaftServer::GarbageCollectReplicationLog(const tx::TransactionId &tx_id) {
rlog_->garbage_collect_older(tx_id);
}
bool RaftServer::IsLeader() { return !exiting_ && mode_ == Mode::LEADER; }
bool RaftServer::IsLeader() { return mode_ == Mode::LEADER; }
uint64_t RaftServer::TermId() { return current_term_; }
@ -653,6 +661,9 @@ void RaftServer::Transition(const Mode &new_mode) {
leader_changed_.notify_all();
break;
}
case Mode::SHUTDOWN:
break;
}
}
@ -756,7 +767,7 @@ void RaftServer::SendLogEntries(
return;
}
if (current_term_ != request_term || mode_ != Mode::LEADER || exiting_) {
if (current_term_ != request_term || mode_ != Mode::LEADER) {
return;
}
@ -838,7 +849,7 @@ void RaftServer::SendSnapshot(uint16_t peer_id,
return;
}
if (current_term_ != request_term || mode_ != Mode::LEADER || exiting_) {
if (current_term_ != request_term || mode_ != Mode::LEADER) {
return;
}
@ -862,7 +873,7 @@ void RaftServer::SendSnapshot(uint16_t peer_id,
void RaftServer::ElectionThreadMain() {
std::unique_lock<std::mutex> lock(lock_);
while (!exiting_) {
while (mode_ != Mode::SHUTDOWN) {
if (Clock::now() >= next_election_) {
VLOG(40) << "Server " << server_id_
<< ": Election timeout exceeded (Term: " << current_term_ << ")";
@ -884,7 +895,7 @@ void RaftServer::PeerThreadMain(uint16_t peer_id) {
* indefinitely. The safest thing to do is to assume some important part of
* state was modified while we were waiting for the response and loop around
* to check. */
while (!exiting_) {
while (mode_ != Mode::SHUTDOWN) {
TimePoint now = Clock::now();
TimePoint wait_until;
@ -920,8 +931,7 @@ void RaftServer::PeerThreadMain(uint16_t peer_id) {
reply = RequestVoteRes(false, request_term);
}
if (current_term_ != request_term || mode_ != Mode::CANDIDATE ||
exiting_) {
if (current_term_ != request_term || mode_ != Mode::CANDIDATE) {
VLOG(40) << "Server " << server_id_
<< ": Ignoring RequestVoteRPC reply from " << peer_id;
break;
@ -954,9 +964,12 @@ void RaftServer::PeerThreadMain(uint16_t peer_id) {
wait_until = next_replication_[peer_id];
break;
}
case Mode::SHUTDOWN:
break;
}
if (exiting_) break;
if (mode_ == Mode::SHUTDOWN) break;
state_changed_.wait_until(lock, wait_until);
}
}
@ -965,7 +978,7 @@ void RaftServer::HBThreadMain(uint16_t peer_id) {
utils::ThreadSetName(fmt::format("HBThread{}", peer_id));
std::unique_lock<std::mutex> lock(heartbeat_lock_);
while (!exiting_) {
while (mode_ != Mode::SHUTDOWN) {
TimePoint now = Clock::now();
TimePoint wait_until;
@ -1004,9 +1017,12 @@ void RaftServer::HBThreadMain(uint16_t peer_id) {
wait_until = next_heartbeat_[peer_id];
break;
}
case Mode::SHUTDOWN:
break;
}
if (exiting_) break;
if (mode_ == Mode::SHUTDOWN) break;
hb_condition_.wait_until(lock, wait_until);
}
}
@ -1015,12 +1031,12 @@ void RaftServer::NoOpIssuerThreadMain() {
utils::ThreadSetName(fmt::format("NoOpIssuer"));
std::mutex m;
auto lock = std::unique_lock<std::mutex>(m);
while (!exiting_) {
while (mode_ != Mode::SHUTDOWN) {
leader_changed_.wait(lock);
// no_op_create_callback_ will create a new transaction that has a NO_OP
// StateDelta. This will trigger the whole procedure of replicating logs
// in our implementation of Raft.
if (!exiting_) NoOpCreate();
if (mode_ != Mode::SHUTDOWN) NoOpCreate();
}
}
@ -1036,7 +1052,7 @@ void RaftServer::SnapshotThread() {
// metadata.
std::lock_guard<std::mutex> snapshot_guard(snapshot_lock_);
std::unique_lock<std::mutex> lock(lock_);
if (exiting_) break;
if (mode_ == Mode::SHUTDOWN) break;
uint64_t committed_log_size = last_applied_;
auto snapshot_metadata = GetSnapshotMetadata();

View File

@ -31,7 +31,7 @@ namespace raft {
using Clock = std::chrono::system_clock;
using TimePoint = std::chrono::system_clock::time_point;
enum class Mode { FOLLOWER, CANDIDATE, LEADER };
enum class Mode { FOLLOWER, CANDIDATE, LEADER, SHUTDOWN };
inline std::string ModeToString(const Mode &mode) {
switch (mode) {
@ -41,6 +41,8 @@ inline std::string ModeToString(const Mode &mode) {
return "CANDIDATE";
case Mode::LEADER:
return "LEADER";
case Mode::SHUTDOWN:
return "SHUTDOWN";
}
}
@ -208,8 +210,6 @@ class RaftServer final : public RaftInterface {
///< that a new leader has been
///< elected.
std::atomic<bool> exiting_{false}; ///< True on server shutdown.
//////////////////////////////////////////////////////////////////////////////
// volatile state on followers and candidates
//////////////////////////////////////////////////////////////////////////////