From 62e06d4b7060bd3e56f07f4a530b5c3ea01a22bf Mon Sep 17 00:00:00 2001 From: Matija Santl Date: Mon, 21 Jan 2019 16:39:52 +0100 Subject: [PATCH] Fix re-election in Raft Summary: Once a leader loses it's leadership, in order to handle hanging transactions, we reset the storage and the transaction engine. This requires to re-apply all the commited entries from the log. Once we add snapshot (log compaction) we would need to do that also. One thing to have in mind is the `election_timeout_min` parameter. If it's set too low it could trigger leader re-election too often. Reviewers: ipaljak Reviewed By: ipaljak Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D1822 --- src/database/single_node_ha/graph_db.cpp | 2 +- src/raft/raft_server.cpp | 21 ++++++++++++--------- src/transactions/single_node_ha/engine.cpp | 2 -- tests/feature_benchmark/ha/benchmark.cpp | 2 ++ 4 files changed, 15 insertions(+), 12 deletions(-) diff --git a/src/database/single_node_ha/graph_db.cpp b/src/database/single_node_ha/graph_db.cpp index 01f0019f2..19ba9e349 100644 --- a/src/database/single_node_ha/graph_db.cpp +++ b/src/database/single_node_ha/graph_db.cpp @@ -121,7 +121,7 @@ bool GraphDb::MakeSnapshot(GraphDbAccessor &accessor) { } void GraphDb::Reset() { - // Release gc scheduler to stop it from touching storage + // Release gc scheduler to stop it from touching storage. storage_gc_ = nullptr; storage_ = std::make_unique(config_.properties_on_disk); diff --git a/src/raft/raft_server.cpp b/src/raft/raft_server.cpp index faa652033..a85141272 100644 --- a/src/raft/raft_server.cpp +++ b/src/raft/raft_server.cpp @@ -128,8 +128,7 @@ void RaftServer::Start() { // [Raft paper 5.3] // "Once a follower learns that a log entry is committed, it applies // the entry to its state machine (in log order) - while (req.leader_commit > last_applied_ && - last_applied_ + 1 < LogSize()) { + while (req.leader_commit > last_applied_ && last_applied_ + 1 < LogSize()) { ++last_applied_; delta_applier_->Apply(GetLogEntry(last_applied_).deltas); } @@ -156,7 +155,7 @@ void RaftServer::Start() { // term, then they store the same command. // - If two entries in different logs have the same index and term, // then the logs are identical in all preceding entries. - if (LogSize() <= req.prev_log_index || + if (LogSize() <= req.prev_log_index || GetLogEntry(req.prev_log_index).term != req.prev_log_term) { AppendEntriesRes res(false, current_term); Save(res, res_builder); @@ -249,8 +248,7 @@ void RaftServer::AppendToLog(const tx::TransactionId &tx_id, // Force issuing heartbeats TimePoint now = Clock::now(); - for (auto &peer_heartbeat : next_heartbeat_) - peer_heartbeat = now; + for (auto &peer_heartbeat : next_heartbeat_) peer_heartbeat = now; state_changed_.notify_all(); } @@ -340,11 +338,18 @@ void RaftServer::Transition(const Mode &new_mode) { log_entry_buffer_.Disable(); if (reset) { + VLOG(40) << "Reseting internal state"; // Temporaray freeze election timer while we do the reset. next_election_ = TimePoint::max(); reset_callback_(); ResetReplicationLog(); + + // Re-apply raft log. + // TODO(msantl): Implement snapshot recovery also! + for (int i = 1; i <= commit_index_; ++i) + delta_applier_->Apply(GetLogEntry(i).deltas); + last_applied_ = commit_index_; } SetNextElectionTimePoint(); @@ -355,7 +360,6 @@ void RaftServer::Transition(const Mode &new_mode) { case Mode::CANDIDATE: { VLOG(40) << "Server " << server_id_ << ": Transition to CANDIDATE (Term: " << CurrentTerm() << ")"; - log_entry_buffer_.Disable(); // [Raft thesis, section 3.4] // "Each candidate restarts its randomized election timeout at the start @@ -463,7 +467,7 @@ void RaftServer::AdvanceCommitIndex() { return; } - VLOG(40) << "Begin noting comimitted transactions"; + VLOG(40) << "Begin applying commited transactions"; for (int i = commit_index_ + 1; i <= new_commit_index; ++i) { auto deltas = GetLogEntry(i).deltas; @@ -791,8 +795,7 @@ std::string RaftServer::SerializeLogEntry(const LogEntry &log_entry) { std::ios_base::binary); { ::capnp::MallocMessageBuilder message; - capnp::LogEntry::Builder log_builder = - message.initRoot(); + capnp::LogEntry::Builder log_builder = message.initRoot(); Save(log_entry, &log_builder); kj::std::StdOutputStream std_stream(stream); kj::BufferedOutputStreamWrapper buffered_stream(std_stream); diff --git a/src/transactions/single_node_ha/engine.cpp b/src/transactions/single_node_ha/engine.cpp index 836ad6047..bc16b2104 100644 --- a/src/transactions/single_node_ha/engine.cpp +++ b/src/transactions/single_node_ha/engine.cpp @@ -202,8 +202,6 @@ void Engine::Reset() { clog_ = nullptr; clog_ = std::make_unique(); } - // local_lock_graph_ should be empty because all transactions should've finish - // by now. accepting_transactions_.store(true); } diff --git a/tests/feature_benchmark/ha/benchmark.cpp b/tests/feature_benchmark/ha/benchmark.cpp index 80e0abae1..05e9e092a 100644 --- a/tests/feature_benchmark/ha/benchmark.cpp +++ b/tests/feature_benchmark/ha/benchmark.cpp @@ -101,6 +101,8 @@ int main(int argc, char **argv) { break; } } + + client.Close(); }); }