Fix re-election in Raft

Summary:
Once a leader loses it's leadership, in order to handle hanging
transactions, we reset the storage and the transaction engine.

This requires to re-apply all the commited entries from the log.

Once we add snapshot (log compaction) we would need to do that also.

One thing to have in mind is the `election_timeout_min` parameter. If it's set
too low it could trigger leader re-election too often.

Reviewers: ipaljak

Reviewed By: ipaljak

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D1822
This commit is contained in:
Matija Santl 2019-01-21 16:39:52 +01:00
parent 276672c310
commit 62e06d4b70
4 changed files with 15 additions and 12 deletions

View File

@ -121,7 +121,7 @@ bool GraphDb::MakeSnapshot(GraphDbAccessor &accessor) {
}
void GraphDb::Reset() {
// Release gc scheduler to stop it from touching storage
// Release gc scheduler to stop it from touching storage.
storage_gc_ = nullptr;
storage_ = std::make_unique<Storage>(config_.properties_on_disk);

View File

@ -128,8 +128,7 @@ void RaftServer::Start() {
// [Raft paper 5.3]
// "Once a follower learns that a log entry is committed, it applies
// the entry to its state machine (in log order)
while (req.leader_commit > last_applied_ &&
last_applied_ + 1 < LogSize()) {
while (req.leader_commit > last_applied_ && last_applied_ + 1 < LogSize()) {
++last_applied_;
delta_applier_->Apply(GetLogEntry(last_applied_).deltas);
}
@ -156,7 +155,7 @@ void RaftServer::Start() {
// term, then they store the same command.
// - If two entries in different logs have the same index and term,
// then the logs are identical in all preceding entries.
if (LogSize() <= req.prev_log_index ||
if (LogSize() <= req.prev_log_index ||
GetLogEntry(req.prev_log_index).term != req.prev_log_term) {
AppendEntriesRes res(false, current_term);
Save(res, res_builder);
@ -249,8 +248,7 @@ void RaftServer::AppendToLog(const tx::TransactionId &tx_id,
// Force issuing heartbeats
TimePoint now = Clock::now();
for (auto &peer_heartbeat : next_heartbeat_)
peer_heartbeat = now;
for (auto &peer_heartbeat : next_heartbeat_) peer_heartbeat = now;
state_changed_.notify_all();
}
@ -340,11 +338,18 @@ void RaftServer::Transition(const Mode &new_mode) {
log_entry_buffer_.Disable();
if (reset) {
VLOG(40) << "Reseting internal state";
// Temporaray freeze election timer while we do the reset.
next_election_ = TimePoint::max();
reset_callback_();
ResetReplicationLog();
// Re-apply raft log.
// TODO(msantl): Implement snapshot recovery also!
for (int i = 1; i <= commit_index_; ++i)
delta_applier_->Apply(GetLogEntry(i).deltas);
last_applied_ = commit_index_;
}
SetNextElectionTimePoint();
@ -355,7 +360,6 @@ void RaftServer::Transition(const Mode &new_mode) {
case Mode::CANDIDATE: {
VLOG(40) << "Server " << server_id_
<< ": Transition to CANDIDATE (Term: " << CurrentTerm() << ")";
log_entry_buffer_.Disable();
// [Raft thesis, section 3.4]
// "Each candidate restarts its randomized election timeout at the start
@ -463,7 +467,7 @@ void RaftServer::AdvanceCommitIndex() {
return;
}
VLOG(40) << "Begin noting comimitted transactions";
VLOG(40) << "Begin applying commited transactions";
for (int i = commit_index_ + 1; i <= new_commit_index; ++i) {
auto deltas = GetLogEntry(i).deltas;
@ -791,8 +795,7 @@ std::string RaftServer::SerializeLogEntry(const LogEntry &log_entry) {
std::ios_base::binary);
{
::capnp::MallocMessageBuilder message;
capnp::LogEntry::Builder log_builder =
message.initRoot<capnp::LogEntry>();
capnp::LogEntry::Builder log_builder = message.initRoot<capnp::LogEntry>();
Save(log_entry, &log_builder);
kj::std::StdOutputStream std_stream(stream);
kj::BufferedOutputStreamWrapper buffered_stream(std_stream);

View File

@ -202,8 +202,6 @@ void Engine::Reset() {
clog_ = nullptr;
clog_ = std::make_unique<CommitLog>();
}
// local_lock_graph_ should be empty because all transactions should've finish
// by now.
accepting_transactions_.store(true);
}

View File

@ -101,6 +101,8 @@ int main(int argc, char **argv) {
break;
}
}
client.Close();
});
}