Revert "Introduce shutdown mode in Raft"
Summary: This reverts commit 08b9197c79
.
Reviewers: mferencevic, ipaljak
Reviewed By: mferencevic, ipaljak
Subscribers: pullbot
Differential Revision: https://phabricator.memgraph.io/D2053
This commit is contained in:
parent
1cc71c6ce8
commit
52adbc6a8b
src/raft
@ -92,7 +92,7 @@ void RaftServer::Start() {
|
||||
// [Raft paper 5.1]
|
||||
// "If a server recieves a request with a stale term,
|
||||
// it rejects the request"
|
||||
if (mode_ == Mode::SHUTDOWN || req.term < current_term_) {
|
||||
if (exiting_ || req.term < current_term_) {
|
||||
RequestVoteRes res(false, current_term_);
|
||||
slk::Save(res, res_builder);
|
||||
return;
|
||||
@ -131,7 +131,7 @@ void RaftServer::Start() {
|
||||
// [Raft paper 5.1]
|
||||
// "If a server receives a request with a stale term, it rejects the
|
||||
// request"
|
||||
if (mode_== Mode::SHUTDOWN || req.term < current_term_) {
|
||||
if (exiting_ || req.term < current_term_) {
|
||||
AppendEntriesRes res(false, current_term_);
|
||||
slk::Save(res, res_builder);
|
||||
return;
|
||||
@ -220,7 +220,7 @@ void RaftServer::Start() {
|
||||
HeartbeatReq req;
|
||||
slk::Load(&req, req_reader);
|
||||
|
||||
if (mode_ == Mode::SHUTDOWN || req.term < current_term_) {
|
||||
if (exiting_ || req.term < current_term_) {
|
||||
HeartbeatRes res(false, current_term_);
|
||||
slk::Save(res, res_builder);
|
||||
return;
|
||||
@ -247,7 +247,7 @@ void RaftServer::Start() {
|
||||
InstallSnapshotReq req;
|
||||
slk::Load(&req, req_reader);
|
||||
|
||||
if (mode_ == Mode::SHUTDOWN || req.term < current_term_) {
|
||||
if (exiting_ || req.term < current_term_) {
|
||||
InstallSnapshotRes res(current_term_);
|
||||
slk::Save(res, res_builder);
|
||||
return;
|
||||
@ -339,7 +339,7 @@ void RaftServer::Start() {
|
||||
void RaftServer::Shutdown() {
|
||||
{
|
||||
std::lock_guard<std::mutex> guard(lock_);
|
||||
mode_ = Mode::SHUTDOWN;
|
||||
exiting_ = true;
|
||||
|
||||
state_changed_.notify_all();
|
||||
election_change_.notify_all();
|
||||
@ -476,14 +476,6 @@ bool RaftServer::SafeToCommit(const tx::TransactionId &tx_id) {
|
||||
// information about that tx.
|
||||
throw InvalidReplicationLogLookup();
|
||||
break;
|
||||
case Mode::SHUTDOWN:
|
||||
// TODO(msantl): This is a bug. If we were the Raft leader before the
|
||||
// shutdown was initiated, it is possible that our bolt client threads
|
||||
// still wait for the log to be replicated. This shouldn't be the case
|
||||
// because the cluster might have already started leader re-election, and
|
||||
// we should inform the client about the situation.
|
||||
return true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@ -491,7 +483,7 @@ void RaftServer::GarbageCollectReplicationLog(const tx::TransactionId &tx_id) {
|
||||
rlog_->garbage_collect_older(tx_id);
|
||||
}
|
||||
|
||||
bool RaftServer::IsLeader() { return mode_ == Mode::LEADER; }
|
||||
bool RaftServer::IsLeader() { return !exiting_ && mode_ == Mode::LEADER; }
|
||||
|
||||
uint64_t RaftServer::TermId() { return current_term_; }
|
||||
|
||||
@ -665,9 +657,6 @@ void RaftServer::Transition(const Mode &new_mode) {
|
||||
leader_changed_.notify_all();
|
||||
break;
|
||||
}
|
||||
|
||||
case Mode::SHUTDOWN:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@ -771,7 +760,7 @@ void RaftServer::SendLogEntries(
|
||||
return;
|
||||
}
|
||||
|
||||
if (current_term_ != request_term || mode_ != Mode::LEADER) {
|
||||
if (current_term_ != request_term || mode_ != Mode::LEADER || exiting_) {
|
||||
return;
|
||||
}
|
||||
|
||||
@ -853,7 +842,7 @@ void RaftServer::SendSnapshot(uint16_t peer_id,
|
||||
return;
|
||||
}
|
||||
|
||||
if (current_term_ != request_term || mode_ != Mode::LEADER) {
|
||||
if (current_term_ != request_term || mode_ != Mode::LEADER || exiting_) {
|
||||
return;
|
||||
}
|
||||
|
||||
@ -877,7 +866,7 @@ void RaftServer::SendSnapshot(uint16_t peer_id,
|
||||
|
||||
void RaftServer::ElectionThreadMain() {
|
||||
std::unique_lock<std::mutex> lock(lock_);
|
||||
while (mode_ != Mode::SHUTDOWN) {
|
||||
while (!exiting_) {
|
||||
if (Clock::now() >= next_election_) {
|
||||
VLOG(40) << "Server " << server_id_
|
||||
<< ": Election timeout exceeded (Term: " << current_term_ << ")";
|
||||
@ -899,7 +888,7 @@ void RaftServer::PeerThreadMain(uint16_t peer_id) {
|
||||
* indefinitely. The safest thing to do is to assume some important part of
|
||||
* state was modified while we were waiting for the response and loop around
|
||||
* to check. */
|
||||
while (mode_ != Mode::SHUTDOWN) {
|
||||
while (!exiting_) {
|
||||
TimePoint now = Clock::now();
|
||||
TimePoint wait_until;
|
||||
|
||||
@ -935,7 +924,8 @@ void RaftServer::PeerThreadMain(uint16_t peer_id) {
|
||||
reply = RequestVoteRes(false, request_term);
|
||||
}
|
||||
|
||||
if (current_term_ != request_term || mode_ != Mode::CANDIDATE) {
|
||||
if (current_term_ != request_term || mode_ != Mode::CANDIDATE ||
|
||||
exiting_) {
|
||||
VLOG(40) << "Server " << server_id_
|
||||
<< ": Ignoring RequestVoteRPC reply from " << peer_id;
|
||||
break;
|
||||
@ -968,12 +958,9 @@ void RaftServer::PeerThreadMain(uint16_t peer_id) {
|
||||
wait_until = next_replication_[peer_id];
|
||||
break;
|
||||
}
|
||||
|
||||
case Mode::SHUTDOWN:
|
||||
break;
|
||||
}
|
||||
|
||||
if (mode_ == Mode::SHUTDOWN) break;
|
||||
if (exiting_) break;
|
||||
state_changed_.wait_until(lock, wait_until);
|
||||
}
|
||||
}
|
||||
@ -982,7 +969,7 @@ void RaftServer::HBThreadMain(uint16_t peer_id) {
|
||||
utils::ThreadSetName(fmt::format("HBThread{}", peer_id));
|
||||
std::unique_lock<std::mutex> lock(heartbeat_lock_);
|
||||
|
||||
while (mode_ != Mode::SHUTDOWN) {
|
||||
while (!exiting_) {
|
||||
TimePoint wait_until;
|
||||
|
||||
if (!issue_hb_) {
|
||||
@ -1006,6 +993,7 @@ void RaftServer::HBThreadMain(uint16_t peer_id) {
|
||||
}
|
||||
}
|
||||
|
||||
if (exiting_) break;
|
||||
hb_condition_.wait_until(lock, wait_until);
|
||||
}
|
||||
}
|
||||
@ -1014,12 +1002,12 @@ void RaftServer::NoOpIssuerThreadMain() {
|
||||
utils::ThreadSetName(fmt::format("NoOpIssuer"));
|
||||
std::mutex m;
|
||||
auto lock = std::unique_lock<std::mutex>(m);
|
||||
while (mode_ != Mode::SHUTDOWN) {
|
||||
while (!exiting_) {
|
||||
leader_changed_.wait(lock);
|
||||
// no_op_create_callback_ will create a new transaction that has a NO_OP
|
||||
// StateDelta. This will trigger the whole procedure of replicating logs
|
||||
// in our implementation of Raft.
|
||||
if (mode_ != Mode::SHUTDOWN) NoOpCreate();
|
||||
if (!exiting_) NoOpCreate();
|
||||
}
|
||||
}
|
||||
|
||||
@ -1035,7 +1023,7 @@ void RaftServer::SnapshotThread() {
|
||||
// metadata.
|
||||
std::lock_guard<std::mutex> snapshot_guard(snapshot_lock_);
|
||||
std::unique_lock<std::mutex> lock(lock_);
|
||||
if (mode_ == Mode::SHUTDOWN) break;
|
||||
if (exiting_) break;
|
||||
|
||||
uint64_t committed_log_size = last_applied_;
|
||||
auto snapshot_metadata = GetSnapshotMetadata();
|
||||
|
@ -31,7 +31,7 @@ namespace raft {
|
||||
using Clock = std::chrono::system_clock;
|
||||
using TimePoint = std::chrono::system_clock::time_point;
|
||||
|
||||
enum class Mode { FOLLOWER, CANDIDATE, LEADER, SHUTDOWN };
|
||||
enum class Mode { FOLLOWER, CANDIDATE, LEADER };
|
||||
|
||||
inline std::string ModeToString(const Mode &mode) {
|
||||
switch (mode) {
|
||||
@ -41,8 +41,6 @@ inline std::string ModeToString(const Mode &mode) {
|
||||
return "CANDIDATE";
|
||||
case Mode::LEADER:
|
||||
return "LEADER";
|
||||
case Mode::SHUTDOWN:
|
||||
return "SHUTDOWN";
|
||||
}
|
||||
}
|
||||
|
||||
@ -213,6 +211,8 @@ class RaftServer final : public RaftInterface {
|
||||
///< that it should start sending
|
||||
///< heartbeats.
|
||||
|
||||
std::atomic<bool> exiting_{false}; ///< True on server shutdown.
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
// volatile state on followers and candidates
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
Loading…
Reference in New Issue
Block a user